[GitHub] [kafka] chia7712 commented on a change in pull request #10976: KAFKA-13036 Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest
chia7712 commented on a change in pull request #10976: URL: https://github.com/apache/kafka/pull/10976#discussion_r709761794 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java ## @@ -300,65 +287,47 @@ public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfte recorder.removeValueProviders(SEGMENT_STORE_NAME_1); reset(recordingTrigger); recordingTrigger.addMetricsRecorder(recorder); -replay(recordingTrigger); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); - -verify(recordingTrigger); } @Test public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); reset(recordingTrigger); -replay(recordingTrigger); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); - -verify(recordingTrigger); } @Test public void shouldCloseStatisticsWhenValueProvidersAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); reset(statisticsToAdd1); statisticsToAdd1.close(); -replay(statisticsToAdd1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - -verify(statisticsToAdd1); } @Test public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); reset(statisticsToAdd1); -replay(statisticsToAdd1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - -verify(statisticsToAdd1); } @Test public void shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); reset(recordingTrigger); -replay(recordingTrigger); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); -verify(recordingTrigger); - reset(recordingTrigger); recordingTrigger.removeMetricsRecorder(recorder); -replay(recordingTrigger); recorder.removeValueProviders(SEGMENT_STORE_NAME_2); Review comment: for example: ```java recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); recorder.removeValueProviders(SEGMENT_STORE_NAME_2); Mockito.verify(recordingTrigger, Mockito.times(1)).removeMetricsRecorder(recorder); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yangdaixai commented on pull request #11329: KAFKA-13301; Optimized the interpretation of the relationship between 'request.timeout. ms' and 'max.poll.interval.ms' in the document.
yangdaixai commented on pull request #11329: URL: https://github.com/apache/kafka/pull/11329#issuecomment-920539185 @guozhangwang issues: https://issues.apache.org/jira/browse/KAFKA-13301?focusedCommentId=17415801=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17415801 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.
[ https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415840#comment-17415840 ] yangshengwei commented on KAFKA-13301: -- [~guozhang] submit a PR: https://github.com/apache/kafka/pull/11329 > The relationship between request.timeout. ms and max.poll.interval.ms in the > Consumer Configs is incorrect. > --- > > Key: KAFKA-13301 > URL: https://issues.apache.org/jira/browse/KAFKA-13301 > Project: Kafka > Issue Type: Improvement >Reporter: yangshengwei >Priority: Trivial > Attachments: image-2021-09-15-15-37-25-561.png, > image-2021-09-15-15-39-00-179.png > > > in Consumer Configs,The value of the configuration max.poll.interval.ms > always be larger than request.timeout.ms must . But here's what the official > document says: The value of the configuration request.timeout.ms must always > be larger than max.poll.interval.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] yangdaixai commented on pull request #11329: KAFKA-13301; Optimized the interpretation of the relationship between 'request.timeout. ms' and 'max.poll.interval.ms' in the document.
yangdaixai commented on pull request #11329: URL: https://github.com/apache/kafka/pull/11329#issuecomment-920537877 @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yangdaixai opened a new pull request #11329: Ysw0916
yangdaixai opened a new pull request #11329: URL: https://github.com/apache/kafka/pull/11329 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected
[ https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415812#comment-17415812 ] Anamika Nadkarni commented on KAFKA-13255: -- [~ryannedolan] Submitted PR. Thank you !! > Mirrormaker config property config.properties.exclude is not working as > expected > - > > Key: KAFKA-13255 > URL: https://issues.apache.org/jira/browse/KAFKA-13255 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.8.0 >Reporter: Anamika Nadkarni >Priority: Major > > Objective - Use MM2 (kafka connect in distributed cluster) for data migration > between cluster hosted in private data center and aws msk cluster. > Steps performed - > # Started kafka-connect service. > # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and > heartbeat connector). Curl commands used to create connectors are in the > attached file. To exclude certain config properties while topic replication, > we are using the 'config.properties.exclude' property in the MM2 source > connector. > Expected - > Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created > in destination cluster. > Actual - > Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination > cluster fails with an error. Error is > {code:java} > [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic > dev.portlandDc.anamika.helloMsk. > (org.apache.kafka.connect.mirror.MirrorSourceConnector:371) > org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic > config name: confluent.value.schema.validation{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] AnamikaN opened a new pull request #11328: Fixed code so user can use config.properties.exclude to exclude prope…
AnamikaN opened a new pull request #11328: URL: https://github.com/apache/kafka/pull/11328 Objective - Use MM2 (kafka connect in distributed cluster) for data migration between cluster hosted in private data center and aws msk cluster. Steps performed - Started kafka-connect service. Created 3 MM2 connectors (i.e. source connector, checkpoint connector and heartbeat connector). Curl commands used to create connectors are in the attached file. To exclude certain config properties while topic replication, we are using the 'config.properties.exclude' property in the MM2 source connector. Expected - Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created in destination cluster. Actual - Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination cluster fails with an error. Error is [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic dev.portlandDc.anamika.helloMsk. (org.apache.kafka.connect.mirror.MirrorSourceConnector:371) org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: confluent.value.schema.validation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker
junrao commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r707813690 ## File path: storage/src/main/resources/message/RemoteLogSegmentMetadataRecordSnapshot.json ## @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 0, + "type": "data", + "name": "RemoteLogSegmentMetadataRecordSnapshot", + "validVersions": "0", + "flexibleVersions": "none", Review comment: Should we support flexible version from the beginning so that we could potentially support downgrade during future format changes? ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * This class represents the remote log data snapshot stored in a file for a specific topic partition. This is used by + * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata received for a specific partition from + * remote log metadata topic. This will avoid reading the remote log metadata messages from the topic again when a + * broker restarts. + */ +public class RemoteLogMetadataSnapshotFile { +private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class); + +public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = "remote_log_snapshot"; + +// header: +// size: 2 + (8+8) + 4 + 8 = 30 +private static final int HEADER_SIZE = 30; + +private final File metadataStoreFile; + +/** + * Creates a CommittedLogMetadataSnapshotFile instance backed by a file with the name `remote_log_snapshot` in + * the given {@code metadataStoreDir}. It creates the file if it does not exist. + * + * @param metadataStoreDir directory in which the snapshot file to be created. + */ +RemoteLogMetadataSnapshotFile(Path metadataStoreDir) { +this.metadataStoreFile = new File(metadataStoreDir.toFile(), COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME); + +// Create an empty file if it does not exist. +try { +boolean newFileCreated = metadataStoreFile.createNewFile(); +log.info("Remote log metadata snapshot file: [{}], newFileCreated: [{}]", metadataStoreFile, newFileCreated); +} catch (IOException e) { +throw new KafkaException(e); +} +} + +/** + * Writes the given snapshot replacing the earlier snapshot data. + * + * @param snapshot Snapshot to be
[jira] [Commented] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.
[ https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415801#comment-17415801 ] Guozhang Wang commented on KAFKA-13301: --- Thanks for filing this [~yangshengwei]. I think it should be "The value of the configuration `request.timeout.ms` must always be smaller than `max.poll.interval.ms`, so we have changed `max.poll.interval.ms`'s default value to just above 5 minutes." Would you like to submit a PR? > The relationship between request.timeout. ms and max.poll.interval.ms in the > Consumer Configs is incorrect. > --- > > Key: KAFKA-13301 > URL: https://issues.apache.org/jira/browse/KAFKA-13301 > Project: Kafka > Issue Type: Improvement >Reporter: yangshengwei >Priority: Trivial > Attachments: image-2021-09-15-15-37-25-561.png, > image-2021-09-15-15-39-00-179.png > > > in Consumer Configs,The value of the configuration max.poll.interval.ms > always be larger than request.timeout.ms must . But here's what the official > document says: The value of the configuration request.timeout.ms must always > be larger than max.poll.interval.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12226) High-throughput source tasks fail to commit offsets
[ https://issues.apache.org/jira/browse/KAFKA-12226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-12226: -- Description: The current source task thread has the following workflow: # Poll messages from the source task # Queue these messages to the producer and send them to Kafka asynchronously. # Add the message to outstandingMessages, or if a flush is currently active, outstandingMessagesBacklog # When the producer completes the send of a record, remove it from outstandingMessages The commit offsets thread has the following workflow: # Wait a flat timeout for outstandingMessages to flush completely # If this times out, add all of the outstandingMessagesBacklog to the outstandingMessages and reset # If it succeeds, commit the source task offsets to the backing store. # Retry the above on a fixed schedule If the source task is producing records quickly (faster than the producer can send), then the producer will throttle the task thread by blocking in its {{send}} method, waiting at most {{max.block.ms}} for space in the {{buffer.memory}} to be available. This means that the number of records in {{outstandingMessages}} + {{outstandingMessagesBacklog}} is proportional to the size of the producer memory buffer. This amount of data might take more than {{offset.flush.timeout.ms}} to flush, and thus the flush will never succeed while the source task is rate-limited by the producer memory. This means that we may write multiple hours of data to Kafka and not ever commit source offsets for the connector. When the task is lost due to a worker failure, hours of data will be re-processed that otherwise were successfully written to Kafka. was: The current source task thread has the following workflow: # Poll messages from the source task # Queue these messages to the producer and send them to Kafka asynchronously. # Add the message to outstandingMessages, or if a flush is currently active, outstandingMessagesBacklog # When the producer completes the send of a record, remove it from outstandingMessages The commit offsets thread has the following workflow: # Wait a flat timeout for outstandingMessages to flush completely # If this times out, add all of the outstandingMessagesBacklog to the outstandingMessages and reset # If it succeeds, commit the source task offsets to the backing store. # Retry the above on a fixed schedule If the source task is producing records quickly (faster than the producer can send), then the producer will throttle the task thread by blocking in its {{send}} method, waiting at most {{max.block.ms}} for space in the {{buffer.memory}} to be available. This means that the number of records in {{outstandingMessages}} + {{outstandingMessagesBacklog}} is proportional to the size of the producer memory buffer. This amount of data might take more than {{offset.flush.timeout.ms}} to flush, and thus the flush will never succeed while the source task is rate-limited by the producer memory. This means that we may write multiple hours of data to Kafka and not ever commit source offsets for the connector. When the task is lost due to a worker failure, hours of data will be re-processed that otherwise were successfully written to Kafka. > High-throughput source tasks fail to commit offsets > --- > > Key: KAFKA-12226 > URL: https://issues.apache.org/jira/browse/KAFKA-12226 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > The current source task thread has the following workflow: > # Poll messages from the source task > # Queue these messages to the producer and send them to Kafka asynchronously. > # Add the message to outstandingMessages, or if a flush is currently active, > outstandingMessagesBacklog > # When the producer completes the send of a record, remove it from > outstandingMessages > The commit offsets thread has the following workflow: > # Wait a flat timeout for outstandingMessages to flush completely > # If this times out, add all of the outstandingMessagesBacklog to the > outstandingMessages and reset > # If it succeeds, commit the source task offsets to the backing store. > # Retry the above on a fixed schedule > If the source task is producing records quickly (faster than the producer can > send), then the producer will throttle the task thread by blocking in its > {{send}} method, waiting at most {{max.block.ms}} for space in the > {{buffer.memory}} to be available. This means that the number of records in > {{outstandingMessages}} + {{outstandingMessagesBacklog}} is proportional to > the size of the producer memory buffer. > This amount of data might take more than {{offset.flush.timeout.ms}} to >
[GitHub] [kafka] guozhangwang commented on pull request #11278: KAFKA-12648: Enforce size limits for each task's cache
guozhangwang commented on pull request #11278: URL: https://github.com/apache/kafka/pull/11278#issuecomment-920445983 > I think that should work. now we have both an upper bound on total memory and a minimum guarantee Yeah, I feel more comfortable for this proposal :) Basically I think instead of defining the config as an "override" on the per-application config, we should just consider having a separate config on the per-topology level (e.g. your proposed one based on percentage of the total per-application). In that way user's can specify clearly what they want, and get exactly what they specified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415792#comment-17415792 ] Victoria Xia commented on KAFKA-13261: -- Thanks for the confirmation, Matthias and Guozhang! I've opened a small KIP with the proposed interface changes to allow users to pass in custom partitioners for FK joins: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins] I tried sending an email to the [d...@kafka.apache.org|mailto:d...@kafka.apache.org] mailing list to start discussion but either it's taking a while to send or my permissions aren't yet set up correctly. Hopefully it will be ready for discussion soon. > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Assignee: Victoria Xia >Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized> > joinMaterialized(String name) { > Materialized> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415755#comment-17415755 ] Andrew patterson commented on KAFKA-12994: -- Think it's fine so far, will move onto the next part when [https://github.com/apache/kafka/pull/11215] is merged. > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Andrew patterson >Priority: Major > Labels: kip-633, newbie, newbie++ > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415747#comment-17415747 ] Guozhang Wang commented on KAFKA-13261: --- > wouldn't we still have an analogous bug if either of the topics for the > source tables had custom partitioning logic created from outside Streams > (i.e., without a repartition() step in the Streams topology)? In this case, > Streams has no way of determining the partitioning of the source tables, > which means we need an update to the interface for foreign key joins so that > users can specify a partitioner to use in order to ensure copartitioning of > the subscription and response topics with the relevant tables. Is this > reasoning sound? Yeah I think that's faire; KS assumes the source topics are partitioned by key, but does not require it has to be partitioned with default mechanism. However when getting back to the source tables from the subscription table it simply assumes default partitioning is used. For that, I agree allowing users to pass in the partitioner in FK would be good, so that if users know the source tables are not partitioned with the default partitioner, they should be responsible for passing that custom partitioner in FK. > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Assignee: Victoria Xia >Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized> > joinMaterialized(String name) { > Materialized> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r709539669 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -493,19 +496,20 @@ private[log] class Cleaner(val id: Int, * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { +doClean(cleanable, time.milliseconds()) + } + + private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = { +info("Beginning cleaning of log %s".format(cleanable.log.name)) + // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment -val deleteHorizonMs = +// this timestamp is only used on the older message formats newer than MAGIC_VALUE_V2 Review comment: ah thanks for the catch :/ I've been getting mixed up in my head -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
junrao commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r709536010 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -493,19 +496,20 @@ private[log] class Cleaner(val id: Int, * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { +doClean(cleanable, time.milliseconds()) + } + + private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = { +info("Beginning cleaning of log %s".format(cleanable.log.name)) + // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment -val deleteHorizonMs = +// this timestamp is only used on the older message formats newer than MAGIC_VALUE_V2 Review comment: newer => older ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r709510271 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -658,20 +673,22 @@ private[log] class Cleaner(val id: Int, } } -if (batch.hasProducerId && isBatchLastRecordOfProducer) - BatchRetention.RETAIN_EMPTY -else if (discardBatchRecords) - BatchRetention.DELETE -else - BatchRetention.DELETE_EMPTY +val batchRetention: BatchRetention = + if (batch.hasProducerId && isBatchLastRecordOfProducer) +BatchRetention.RETAIN_EMPTY + else if (discardBatchRecords) +BatchRetention.DELETE + else +BatchRetention.DELETE_EMPTY +new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch) Review comment: hmm yeah I think you are right. I'll change to `canDiscardBatch && batch.isControlBatch` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gitlw commented on pull request #11285: KAFKA-10548: Implement topic deletion logic with the LeaderAndIsr in KIP-516
gitlw commented on pull request #11285: URL: https://github.com/apache/kafka/pull/11285#issuecomment-920312333 @lbradstreet @jolshan Can you please take a look? Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
mattwong949 commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r709503140 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -622,26 +628,35 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated */ private[log] def cleanInto(topicPartition: TopicPartition, sourceRecords: FileRecords, dest: LogSegment, map: OffsetMap, - retainDeletesAndTxnMarkers: Boolean, + retainLegacyDeletesAndTxnMarkers: Boolean, + deleteRetentionMs: Long, maxLogMessageSize: Int, transactionMetadata: CleanedTransactionMetadata, lastRecordsOfActiveProducers: Map[Long, LastRecord], - stats: CleanerStats): Unit = { -val logCleanerFilter: RecordFilter = new RecordFilter { + stats: CleanerStats, + currentTime: Long): Unit = { +val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) { var discardBatchRecords: Boolean = _ - override def checkBatchRetention(batch: RecordBatch): BatchRetention = { + override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. -discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) +val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata) + +if (batch.isControlBatch) + discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime +else + discardBatchRecords = canDiscardBatch Review comment: makes sense. I've removed that comment on 1136 since the case is mentioned in `isBatchLastRecordOfProducer` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415690#comment-17415690 ] Matthias J. Sax edited comment on KAFKA-13261 at 9/15/21, 6:56 PM: --- I would prefer to do a KIP and allow user to pass in a custom partitioner. If we really get follow up requests, we could extend the DSL logic to auto-forward an upstream partitioner later. Overall, it seems not that there is no bug in the strong sense, but a missing feature. We never designed FK-join to support custom partitioning. Might be worth to update the docs for 3.0 and earlier to point out this limitation. was (Author: mjsax): I would prefer to do a KIP and allow user to pass in a custom partitioner. If we really get follow up requests, we could extend the DSL logic to auto-forward an upstream partitioner later. > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Assignee: Victoria Xia >Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized> > joinMaterialized(String name) { > Materialized> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vincent81jiang opened a new pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge
vincent81jiang opened a new pull request #11327: URL: https://github.com/apache/kafka/pull/11327 * Fix KAFKA-13305: NullPointerException in LogCleanerManager "uncleanable-bytes" gauge - Add a periodic task to remove deleted partitions from uncleanablePartitions ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415690#comment-17415690 ] Matthias J. Sax commented on KAFKA-13261: - I would prefer to do a KIP and allow user to pass in a custom partitioner. If we really get follow up requests, we could extend the DSL logic to auto-forward an upstream partitioner later. > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Assignee: Victoria Xia >Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized> > joinMaterialized(String name) { > Materialized> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ccding commented on pull request #11293: MINOR: defineInternal for KIP-405 configs
ccding commented on pull request #11293: URL: https://github.com/apache/kafka/pull/11293#issuecomment-920248580 Will cherry-pick this PR to 3.0 after it is merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding edited a comment on pull request #11293: MINOR: defineInternal for KIP-405 configs
ccding edited a comment on pull request #11293: URL: https://github.com/apache/kafka/pull/11293#issuecomment-920247203 The comment of `defineInternal` says ``` * Define a new internal configuration. Internal configuration won't show up in the docs and aren't * intended for general use. ``` I read it as the config should be available and visible to users, but not documented. Therefore, I changed the DescribeTopic call to return internal configs. PTAL @junrao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on pull request #11293: MINOR: defineInternal for KIP-405 configs
ccding commented on pull request #11293: URL: https://github.com/apache/kafka/pull/11293#issuecomment-920247203 The commend of `defineInternal` says ``` * Define a new internal configuration. Internal configuration won't show up in the docs and aren't * intended for general use. ``` I read it as the config should be available and visible to users, but not documented. Therefore, I changed the DescribeTopic call to return internal configs. PTAL @junrao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13305) NullPointerException in LogCleanerManager "uncleanable-bytes" gauge
Vincent Jiang created KAFKA-13305: - Summary: NullPointerException in LogCleanerManager "uncleanable-bytes" gauge Key: KAFKA-13305 URL: https://issues.apache.org/jira/browse/KAFKA-13305 Project: Kafka Issue Type: Bug Components: log cleaner Reporter: Vincent Jiang We've seen following exception in production environment: {quote} java.lang.NullPointerException: Cannot invoke "kafka.log.UnifiedLog.logStartOffset()" because "log" is null at kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:599) {quote} Looks like uncleanablePartitions never has partitions removed from it to reflect partition deletion/reassignment. We should fix the NullPointerException and removed deleted partitions from uncleanablePartitions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7408) Truncate to LSO on unclean leader election
[ https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415635#comment-17415635 ] Jose Armando Garcia Sancio commented on KAFKA-7408: --- Good points [~guozhang] . {quote}later the txn coordinator tries to write a `zC` on behalf of the producer, do we guaranteed this `zC` would be rejected? {quote} I don't know the answer at the moment and need to look at the code in detail to confirm but you are correct, the transaction coordinator needs to reject a {{zC}} if the transaction was already aborted by the unclean leader. > Truncate to LSO on unclean leader election > -- > > Key: KAFKA-7408 > URL: https://issues.apache.org/jira/browse/KAFKA-7408 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jose Armando Garcia Sancio >Priority: Major > > If an unclean leader is elected, we may lose committed transaction data. That > alone is expected, but what is worse is that a transaction which was > previously completed (either committed or aborted) may lose its marker and > become dangling. The transaction coordinator will not know about the unclean > leader election, so will not know to resend the transaction markers. > Consumers with read_committed isolation will be stuck because the LSO cannot > advance. > To keep this scenario from occurring, it would be better to have the unclean > leader truncate to the LSO so that there are no dangling transactions. > Truncating to the LSO is not alone sufficient because the markers which > allowed the LSO advancement may be at higher offsets. What we can do is let > the newly elected leader truncate to the LSO and then rewrite all the markers > that followed it using its own leader epoch (to avoid divergence from > followers). > The interesting cases when an unclean leader election occurs are are when a > transaction is ongoing. > 1. If a producer is in the middle of a transaction commit, then the > coordinator may still attempt to write transaction markers. This will either > succeed or fail depending on the producer epoch in the unclean leader. If the > epoch matches, then the WriteTxnMarker call will succeed, which will simply > be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker > call will fail and the transaction coordinator can potentially remove the > partition from the transaction. > 2. If a producer is still writing the transaction, then what happens depends > on the producer state in the unclean leader. If no producer state has been > lost, then the transaction can continue without impact. Otherwise, the > producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will > cause the transaction to be aborted by the coordinator. That takes us back to > the first case. > By truncating the LSO, we ensure that transactions are either preserved in > whole or they are removed from the log in whole. For an unclean leader > election, that's probably as good as we can do. But we are ensured that > consumers will not be blocked by dangling transactions. The only remaining > situation where a dangling transaction might be left is if one of the > transaction state partitions has an unclean leader election. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7408) Truncate to LSO on unclean leader election
[ https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio reassigned KAFKA-7408: - Assignee: Jose Armando Garcia Sancio > Truncate to LSO on unclean leader election > -- > > Key: KAFKA-7408 > URL: https://issues.apache.org/jira/browse/KAFKA-7408 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jose Armando Garcia Sancio >Priority: Major > > If an unclean leader is elected, we may lose committed transaction data. That > alone is expected, but what is worse is that a transaction which was > previously completed (either committed or aborted) may lose its marker and > become dangling. The transaction coordinator will not know about the unclean > leader election, so will not know to resend the transaction markers. > Consumers with read_committed isolation will be stuck because the LSO cannot > advance. > To keep this scenario from occurring, it would be better to have the unclean > leader truncate to the LSO so that there are no dangling transactions. > Truncating to the LSO is not alone sufficient because the markers which > allowed the LSO advancement may be at higher offsets. What we can do is let > the newly elected leader truncate to the LSO and then rewrite all the markers > that followed it using its own leader epoch (to avoid divergence from > followers). > The interesting cases when an unclean leader election occurs are are when a > transaction is ongoing. > 1. If a producer is in the middle of a transaction commit, then the > coordinator may still attempt to write transaction markers. This will either > succeed or fail depending on the producer epoch in the unclean leader. If the > epoch matches, then the WriteTxnMarker call will succeed, which will simply > be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker > call will fail and the transaction coordinator can potentially remove the > partition from the transaction. > 2. If a producer is still writing the transaction, then what happens depends > on the producer state in the unclean leader. If no producer state has been > lost, then the transaction can continue without impact. Otherwise, the > producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will > cause the transaction to be aborted by the coordinator. That takes us back to > the first case. > By truncating the LSO, we ensure that transactions are either preserved in > whole or they are removed from the log in whole. For an unclean leader > election, that's probably as good as we can do. But we are ensured that > consumers will not be blocked by dangling transactions. The only remaining > situation where a dangling transaction might be left is if one of the > transaction state partitions has an unclean leader election. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r709357795 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled + +val partitionsToUpdateFollower = mutable.Set.empty[Partition] +try { + partitionStates.forKeyValue { (partition, partitionState) => +val newLeaderBrokerId = partitionState.leader + if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { +// Only change partition state when the leader is available +partitionsToUpdateFollower += partition + } else { +// The leader broker should always be present in the metadata cache. +// If not, we should record the error message and abort the transition process for this partition +stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + + s"(last update controller epoch ${partitionState.controllerEpoch}) " + + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") + } + } + + if (isShuttingDown.get()) { +if (traceLoggingEnabled) { + partitionsToUpdateFollower.foreach { partition => +stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " + + "since it is shutting down") + } +} + } else { +val partitionsToUpdateFollowerWithLeader = partitionsToUpdateFollower.map { partition => + val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache. +getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode()) + val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port()) + (partition.topicPartition, BrokerAndFetcherId(leader, replicaFetcherManager.getFetcherId(partition.topicPartition))) +} + replicaFetcherManager.addTopicIdsToFetcherThread(partitionsToUpdateFollowerWithLeader, topicIds) Review comment: Do you think we also don't need the check for the leader in the metadata cache? ie: ``` val newLeaderBrokerId = partitionState.leader if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { // Only change partition state when the leader is available partitionsToUpdateFollower += partition ``` If we do keep it, I can change the comment to be less confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct
cmccabe commented on pull request #11320: URL: https://github.com/apache/kafka/pull/11320#issuecomment-920176422 > On the builders, should we drop the "set" from the method names? I believe we have establish the pattern (in Scala, at least) of not including "get" and "set" in getters and setters names. Should we extend that pattern to Java code as well? Hmm... as far as I know, the pattern in Kafka is to omit the "get" in getters, so we have `x()` rather than `getX()`. I don't think omitting the "set" in `setX()` is a good idea since it creates confusion between what is a setter and what is a getter. I can give a lot of precedents. For example, all the generated code created by MessageDataGenerator has always used "setX" to refer to setters for X. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #11278: KAFKA-12648: Enforce size limits for each task's cache
wcarlson5 commented on pull request #11278: URL: https://github.com/apache/kafka/pull/11278#issuecomment-920174685 @guozhangwang That is a really good point. Maybe we should more be specific. What if each topology could request a percentage of the total cache? If a request made the total exceed 99% the request would be rejected. Any unclaimed cache would be split among the topologies that did not claim any. A topology could lower their cache size if they want to make space for a new topology. 1. If A requests 50% it gets it an the rest is unused 2. B joins but does not request so it gets the other 50 3. C joins but request 75% so it fails. C then requests 25% so now A has 50%, B 25% and C 25% 4. D joins without a request so now A has 50%, B 12%, C 25% and D 13% 5. A reduces its request to 25% now all have 25% 6. E joins and requests 0%, not using any cache and all other topologies are unchanged I think that should work. now we have both an upper bound on total memory and a minimum guarantee -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct
cmccabe commented on a change in pull request #11320: URL: https://github.com/apache/kafka/pull/11320#discussion_r709349588 ## File path: core/src/main/java/kafka/server/builders/KafkaApisBuilder.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.builders; + +import kafka.coordinator.group.GroupCoordinator; +import kafka.coordinator.transaction.TransactionCoordinator; +import kafka.network.RequestChannel; +import kafka.server.ApiVersionManager; +import kafka.server.AutoTopicCreationManager; +import kafka.server.BrokerTopicStats; +import kafka.server.DelegationTokenManager; +import kafka.server.FetchManager; +import kafka.server.KafkaApis; +import kafka.server.KafkaConfig; +import kafka.server.MetadataCache; +import kafka.server.MetadataSupport; +import kafka.server.QuotaFactory.QuotaManagers; +import kafka.server.ReplicaManager; +import kafka.server.metadata.ConfigRepository; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.authorizer.Authorizer; + +import java.util.Collections; +import java.util.Optional; +import scala.compat.java8.OptionConverters; + + +public class KafkaApisBuilder { +private RequestChannel requestChannel = null; +private MetadataSupport metadataSupport = null; +private ReplicaManager replicaManager = null; +private GroupCoordinator groupCoordinator = null; +private TransactionCoordinator txnCoordinator = null; +private AutoTopicCreationManager autoTopicCreationManager = null; +private int brokerId = 0; +private KafkaConfig config = null; +private ConfigRepository configRepository = null; +private MetadataCache metadataCache = null; +private Metrics metrics = null; +private Optional authorizer = Optional.empty(); +private QuotaManagers quotas = null; +private FetchManager fetchManager = null; +private BrokerTopicStats brokerTopicStats = null; +private String clusterId = "clusterId"; +private Time time = Time.SYSTEM; +private DelegationTokenManager tokenManager = null; +private ApiVersionManager apiVersionManager = null; + +public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) { +this.requestChannel = requestChannel; +return this; +} + +public KafkaApisBuilder setMetadataSupport(MetadataSupport metadataSupport) { +this.metadataSupport = metadataSupport; +return this; +} + +public KafkaApisBuilder setReplicaManager(ReplicaManager replicaManager) { +this.replicaManager = replicaManager; +return this; +} + +public KafkaApisBuilder setGroupCoordinator(GroupCoordinator groupCoordinator) { +this.groupCoordinator = groupCoordinator; +return this; +} + +public KafkaApisBuilder setTxnCoordinator(TransactionCoordinator txnCoordinator) { +this.txnCoordinator = txnCoordinator; +return this; +} + +public KafkaApisBuilder setAutoTopicCreationManager(AutoTopicCreationManager autoTopicCreationManager) { +this.autoTopicCreationManager = autoTopicCreationManager; +return this; +} + +public KafkaApisBuilder setBrokerId(int brokerId) { +this.brokerId = brokerId; +return this; +} + +public KafkaApisBuilder setConfig(KafkaConfig config) { +this.config = config; +return this; +} + +public KafkaApisBuilder setConfigRepository(ConfigRepository configRepository) { +this.configRepository = configRepository; +return this; +} + +public KafkaApisBuilder setMetadataCache(MetadataCache metadataCache) { +this.metadataCache = metadataCache; +return this; +} + +public KafkaApisBuilder setMetrics(Metrics metrics) { +this.metrics = metrics; +return this; +} + +public KafkaApisBuilder setAuthorizer(Optional authorizer) { +this.authorizer = authorizer; +return this; +} + +public KafkaApisBuilder setQuotas(QuotaManagers quotas) { +this.quotas = quotas; +return this; +} + +public KafkaApisBuilder setFetchManager(FetchManager
[GitHub] [kafka] cmccabe commented on a change in pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct
cmccabe commented on a change in pull request #11320: URL: https://github.com/apache/kafka/pull/11320#discussion_r709348909 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -186,49 +186,37 @@ object ReplicaManager { class ReplicaManager(val config: KafkaConfig, metrics: Metrics, time: Time, - val zkClient: Option[KafkaZkClient], scheduler: Scheduler, val logManager: LogManager, - val isShuttingDown: AtomicBoolean, quotaManagers: QuotaManagers, - val brokerTopicStats: BrokerTopicStats, val metadataCache: MetadataCache, logDirFailureChannel: LogDirFailureChannel, - val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce], - val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch], - val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords], - val delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader], - threadNamePrefix: Option[String], - val alterIsrManager: AlterIsrManager) extends Logging with KafkaMetricsGroup { - - def this(config: KafkaConfig, - metrics: Metrics, - time: Time, - zkClient: Option[KafkaZkClient], - scheduler: Scheduler, - logManager: LogManager, - isShuttingDown: AtomicBoolean, - quotaManagers: QuotaManagers, - brokerTopicStats: BrokerTopicStats, - metadataCache: MetadataCache, - logDirFailureChannel: LogDirFailureChannel, - alterIsrManager: AlterIsrManager, - threadNamePrefix: Option[String] = None) = { -this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown, - quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel, - DelayedOperationPurgatory[DelayedProduce]( -purgatoryName = "Produce", brokerId = config.brokerId, -purgeInterval = config.producerPurgatoryPurgeIntervalRequests), - DelayedOperationPurgatory[DelayedFetch]( -purgatoryName = "Fetch", brokerId = config.brokerId, -purgeInterval = config.fetchPurgatoryPurgeIntervalRequests), - DelayedOperationPurgatory[DelayedDeleteRecords]( -purgatoryName = "DeleteRecords", brokerId = config.brokerId, -purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests), - DelayedOperationPurgatory[DelayedElectLeader]( -purgatoryName = "ElectLeader", brokerId = config.brokerId), - threadNamePrefix, alterIsrManager) - } + val alterIsrManager: AlterIsrManager, + val brokerTopicStats: BrokerTopicStats = new BrokerTopicStats(), + val isShuttingDown: AtomicBoolean = new AtomicBoolean(false), + val zkClient: Option[KafkaZkClient] = None, + delayedProducePurgatoryParam: Option[DelayedOperationPurgatory[DelayedProduce]] = None, + delayedFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedFetch]] = None, + delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None, + delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None, + threadNamePrefix: Option[String] = None, Review comment: It seems good to use a trailing comma here, since otherwise adding a new parameter changes the previous line (adding unnecessary conflicts) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct
cmccabe commented on a change in pull request #11320: URL: https://github.com/apache/kafka/pull/11320#discussion_r709348152 ## File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala ## @@ -75,13 +75,25 @@ class LogLoaderTest { // Create a LogManager with some overridden methods to facilitate interception of clean shutdown // flag and to inject a runtime error -def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], simulateError: SimulateError): LogManager = { - new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], new MockConfigRepository(), -initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4, -flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 1L, flushStartOffsetCheckpointMs = 1L, -retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time, -brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size), -keepPartitionMetadataFile = config.usesTopicId, interBrokerProtocolVersion = config.interBrokerProtocolVersion) { +def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], simulateError: SimulateError): LogManager = + new LogManager( +logDirs = logDirs.map(_.getAbsoluteFile), +initialOfflineDirs = Array.empty[File], +configRepository = new MockConfigRepository(), +initialDefaultConfig = logConfig, +cleanerConfig = CleanerConfig(enableCleaner = false), +recoveryThreadsPerDataDir = 4, +flushCheckMs = 1000L, +flushRecoveryOffsetCheckpointMs = 1L, +flushStartOffsetCheckpointMs = 1L, +retentionCheckMs = 1000L, +maxPidExpirationMs = 60 * 60 * 1000, +interBrokerProtocolVersion = config.interBrokerProtocolVersion, +scheduler = time.scheduler, +brokerTopicStats = new BrokerTopicStats, Review comment: I'll add parens -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r709342716 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -741,7 +741,8 @@ class ReplicaManager(val config: KafkaConfig, // throw NotLeaderOrFollowerException if replica does not exist for the given partition val partition = getPartitionOrException(topicPartition) - partition.localLogOrException Review comment: The code surrounding is from 14 months - 2 years ago, so not sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r709337669 ## File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala ## @@ -163,6 +163,17 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs") } + def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, BrokerAndFetcherId)], topicIds: String => Option[Uuid]): Unit = { +lock synchronized { + val partitionsPerFetcher = partitionsToUpdate.groupMap(_._2)(_._1) + + for ((brokerAndFetcherId, partitions) <- partitionsPerFetcher) { +val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId) Review comment: Ah actually this is really subtle. One is BrokerAndFetcherId and the other is Broker**_Id_**AndFetcherId. I can simplify and just create the second one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r709334903 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -459,18 +459,22 @@ abstract class AbstractFetcherThread(name: String, */ private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) { - currentState + if (currentState.topicId.isEmpty && initialFetchState.topicId.isDefined) { +currentState.updateTopicId(initialFetchState.topicId) + } else { +currentState + } Review comment: Ah good catch. I don't think so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r709328285 ## File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala ## @@ -163,6 +163,17 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs") } + def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, BrokerAndFetcherId)], topicIds: String => Option[Uuid]): Unit = { +lock synchronized { + val partitionsPerFetcher = partitionsToUpdate.groupMap(_._2)(_._1) + + for ((brokerAndFetcherId, partitions) <- partitionsPerFetcher) { +val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId) Review comment: I was pulling from the add partitions method. But it does something different. 臘♀️ I can just use the one from before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #11261: KAFKA-13228: Ensure ApiVersionRequest is properly handled in KRaft
dengziming commented on pull request #11261: URL: https://github.com/apache/kafka/pull/11261#issuecomment-920145832 ping @hachikuji @abbccdda -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13162) ElectLeader API must be forwarded to Controller
[ https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13162. - Resolution: Fixed > ElectLeader API must be forwarded to Controller > --- > > Key: KAFKA-13162 > URL: https://issues.apache.org/jira/browse/KAFKA-13162 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.1.0 > > > We're missing the logic to forward ElectLeaders requests to the controller. > This means that `kafka-leader-election.sh` does not work correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13162) ElectLeader API must be forwarded to Controller
[ https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13162: Fix Version/s: (was: 3.0.1) 3.1.0 > ElectLeader API must be forwarded to Controller > --- > > Key: KAFKA-13162 > URL: https://issues.apache.org/jira/browse/KAFKA-13162 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.1.0 > > > We're missing the logic to forward ElectLeaders requests to the controller. > This means that `kafka-leader-election.sh` does not work correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
hachikuji merged pull request #11186: URL: https://github.com/apache/kafka/pull/11186 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct
mumrah commented on a change in pull request #11320: URL: https://github.com/apache/kafka/pull/11320#discussion_r709241900 ## File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala ## @@ -75,13 +75,25 @@ class LogLoaderTest { // Create a LogManager with some overridden methods to facilitate interception of clean shutdown // flag and to inject a runtime error -def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], simulateError: SimulateError): LogManager = { - new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], new MockConfigRepository(), -initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4, -flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 1L, flushStartOffsetCheckpointMs = 1L, -retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time, -brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size), -keepPartitionMetadataFile = config.usesTopicId, interBrokerProtocolVersion = config.interBrokerProtocolVersion) { +def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], simulateError: SimulateError): LogManager = + new LogManager( +logDirs = logDirs.map(_.getAbsoluteFile), +initialOfflineDirs = Array.empty[File], +configRepository = new MockConfigRepository(), +initialDefaultConfig = logConfig, +cleanerConfig = CleanerConfig(enableCleaner = false), +recoveryThreadsPerDataDir = 4, +flushCheckMs = 1000L, +flushRecoveryOffsetCheckpointMs = 1L, +flushStartOffsetCheckpointMs = 1L, +retentionCheckMs = 1000L, +maxPidExpirationMs = 60 * 60 * 1000, +interBrokerProtocolVersion = config.interBrokerProtocolVersion, +scheduler = time.scheduler, +brokerTopicStats = new BrokerTopicStats, Review comment: nit: we should make the no-arg constructor calls consistent wrt parens ## File path: core/src/main/java/kafka/server/builders/KafkaApisBuilder.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.builders; + +import kafka.coordinator.group.GroupCoordinator; +import kafka.coordinator.transaction.TransactionCoordinator; +import kafka.network.RequestChannel; +import kafka.server.ApiVersionManager; +import kafka.server.AutoTopicCreationManager; +import kafka.server.BrokerTopicStats; +import kafka.server.DelegationTokenManager; +import kafka.server.FetchManager; +import kafka.server.KafkaApis; +import kafka.server.KafkaConfig; +import kafka.server.MetadataCache; +import kafka.server.MetadataSupport; +import kafka.server.QuotaFactory.QuotaManagers; +import kafka.server.ReplicaManager; +import kafka.server.metadata.ConfigRepository; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.authorizer.Authorizer; + +import java.util.Collections; +import java.util.Optional; +import scala.compat.java8.OptionConverters; + + +public class KafkaApisBuilder { +private RequestChannel requestChannel = null; +private MetadataSupport metadataSupport = null; +private ReplicaManager replicaManager = null; +private GroupCoordinator groupCoordinator = null; +private TransactionCoordinator txnCoordinator = null; +private AutoTopicCreationManager autoTopicCreationManager = null; +private int brokerId = 0; +private KafkaConfig config = null; +private ConfigRepository configRepository = null; +private MetadataCache metadataCache = null; +private Metrics metrics = null; +private Optional authorizer = Optional.empty(); +private QuotaManagers quotas = null; +private FetchManager fetchManager = null; +private BrokerTopicStats brokerTopicStats = null; +private String clusterId = "clusterId"; +private Time time = Time.SYSTEM; +private DelegationTokenManager tokenManager = null; +private ApiVersionManager apiVersionManager = null; + +
[jira] [Created] (KAFKA-13304) Implicit cast of source type long to narrower destination type int in org.apache.kafka.common.network.MultiSend.java
Thomas Bachmann created KAFKA-13304: --- Summary: Implicit cast of source type long to narrower destination type int in org.apache.kafka.common.network.MultiSend.java Key: KAFKA-13304 URL: https://issues.apache.org/jira/browse/KAFKA-13304 Project: Kafka Issue Type: Bug Components: network Affects Versions: 2.8.0 Reporter: Thomas Bachmann During a security review of Kafka I came across this bug [https://lgtm.com/projects/g/apache/kafka/rev/78ba492e3e70fd9db61bc82469371d04a8d6b762/files/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java?sort=name=ASC=heatmap#x9c1b5901406741c6:1] {code:java} @Override74public long writeTo(GatheringByteChannel channel) throws IOException {75if (completed())76throw new KafkaException("This operation cannot be completed on a complete request.");7778int totalWrittenPerCall = 0;79boolean sendComplete = false;80do {81long written = current.writeTo(channel);82totalWritten += written;83 totalWrittenPerCall += written; {code} "Implicit cast of source type long to narrower destination type int." -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13303) RoundRobinPartitioner broken by KIP-480
[ https://issues.apache.org/jira/browse/KAFKA-13303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415497#comment-17415497 ] Luke Chen commented on KAFKA-13303: --- Could you also leave your PR link on KAFKA-9965 comment, so that they can be closed together. Thanks. > RoundRobinPartitioner broken by KIP-480 > --- > > Key: KAFKA-13303 > URL: https://issues.apache.org/jira/browse/KAFKA-13303 > Project: Kafka > Issue Type: Bug >Reporter: Jon McEwen >Priority: Minor > > Since KIP-480 Sticky Partitioning, the RoundRobinPartitioner doesn't behave > correctly. An additional call to `partition()` on new batch leads to > partitions being skipped. > > I have a fix that I would like to contribute, but I need help getting started > as a contributor, e.g. for basic things like formatting the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13303) RoundRobinPartitioner broken by KIP-480
[ https://issues.apache.org/jira/browse/KAFKA-13303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415496#comment-17415496 ] Luke Chen commented on KAFKA-13303: --- This is duplicated with KAFKA-9965. > RoundRobinPartitioner broken by KIP-480 > --- > > Key: KAFKA-13303 > URL: https://issues.apache.org/jira/browse/KAFKA-13303 > Project: Kafka > Issue Type: Bug >Reporter: Jon McEwen >Priority: Minor > > Since KIP-480 Sticky Partitioning, the RoundRobinPartitioner doesn't behave > correctly. An additional call to `partition()` on new batch leads to > partitions being skipped. > > I have a fix that I would like to contribute, but I need help getting started > as a contributor, e.g. for basic things like formatting the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415491#comment-17415491 ] NEERAJ VAIDYA commented on KAFKA-13292: --- Thanks [~mjsax] I will look at upgrading the client libraries to 2.8.0. > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: NEERAJ VAIDYA >Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transactional id. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > > After this, I can see that all 12 tasks (because there are 12 partitions for > all topics) get shutdown and this brings down the whole application. > > I understand that the transactional.id.expiration.ms = 7 days (default) will > likely
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r709122840 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { Review comment: nit: Should we prefix this method with `maybe` to indicate that it would set the topic id only if there is a state for the topic partition? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { +partitionMapLock.lockInterruptibly() +try { + partitions.foreach { tp => +val currentState = partitionStates.stateValue(tp) Review comment: Should we ensure that there is actually a state? It must be there but it might be better to be safe. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -459,18 +459,22 @@ abstract class AbstractFetcherThread(name: String, */ private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) { - currentState + if (currentState.topicId.isEmpty && initialFetchState.topicId.isDefined) { +currentState.updateTopicId(initialFetchState.topicId) + } else { +currentState + } Review comment: Is this change still necessary? ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled + +val partitionsToUpdateFollower = mutable.Set.empty[Partition] +try { + partitionStates.forKeyValue { (partition, partitionState) => +val newLeaderBrokerId = partitionState.leader + if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { +// Only change partition state when the leader is available +partitionsToUpdateFollower += partition + } else { +// The leader broker should always be present in the metadata cache. +// If not, we should record the error message and abort the transition process for this partition +stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + + s"(last update controller epoch ${partitionState.controllerEpoch}) " + + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") + } + } + + if (isShuttingDown.get()) { +if (traceLoggingEnabled) { + partitionsToUpdateFollower.foreach { partition => +stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " + + "since it is shutting down") + } +} + } else { +val partitionsToUpdateFollowerWithLeader = partitionsToUpdateFollower.map { partition => + val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache. +getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode()) + val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port()) + (partition.topicPartition, BrokerAndFetcherId(leader, replicaFetcherManager.getFetcherId(partition.topicPartition))) Review comment: It looks like that `addTopicIdsToFetcherThread` only needs the `leader`, the `topic-partition` (to compute the fetcher id, and the `topic id`. How about passing just those? I would also let the fetcher manager compute the fetcher id. ## File path:
[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415486#comment-17415486 ] Sagar Rao commented on KAFKA-13295: --- [~guozhang]/ [~ableegoldman] I assigned this to myself.. Would go through the code and see if I can find something :D > Long restoration times for new tasks can lead to transaction timeouts > - > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Critical > Labels: eos > Fix For: 3.1.0 > > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-13295: - Assignee: Sagar Rao > Long restoration times for new tasks can lead to transaction timeouts > - > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Critical > Labels: eos > Fix For: 3.1.0 > > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13303) RoundRobinPartitioner broken by KIP-480
[ https://issues.apache.org/jira/browse/KAFKA-13303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415452#comment-17415452 ] Jon McEwen commented on KAFKA-13303: WIP patch: https://github.com/apache/kafka/pull/11326 > RoundRobinPartitioner broken by KIP-480 > --- > > Key: KAFKA-13303 > URL: https://issues.apache.org/jira/browse/KAFKA-13303 > Project: Kafka > Issue Type: Bug >Reporter: Jon McEwen >Priority: Minor > > Since KIP-480 Sticky Partitioning, the RoundRobinPartitioner doesn't behave > correctly. An additional call to `partition()` on new batch leads to > partitions being skipped. > > I have a fix that I would like to contribute, but I need help getting started > as a contributor, e.g. for basic things like formatting the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jonmcewen opened a new pull request #11326: WIP: KAFKA-13303: RoundRobinPartitioner broken by KIP-480
jonmcewen opened a new pull request #11326: URL: https://github.com/apache/kafka/pull/11326 NEEDS FORMAT AND TESTS fixes for sticky partitioning and thread safety. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13246) StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread does not gate on stream state well
[ https://issues.apache.org/jira/browse/KAFKA-13246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andre Dymel reassigned KAFKA-13246: --- Assignee: Andre Dymel > StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread > does not gate on stream state well > > > Key: KAFKA-13246 > URL: https://issues.apache.org/jira/browse/KAFKA-13246 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Assignee: Andre Dymel >Priority: Major > Labels: newbie > > StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread > should be improved by waiting for the client to go to rebalancing or running > after adding and removing a thread. It should also wait until running before > querying the state store -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13303) RoundRobinPartitioner broken by KIP-480
Jon McEwen created KAFKA-13303: -- Summary: RoundRobinPartitioner broken by KIP-480 Key: KAFKA-13303 URL: https://issues.apache.org/jira/browse/KAFKA-13303 Project: Kafka Issue Type: Bug Reporter: Jon McEwen Since KIP-480 Sticky Partitioning, the RoundRobinPartitioner doesn't behave correctly. An additional call to `partition()` on new batch leads to partitions being skipped. I have a fix that I would like to contribute, but I need help getting started as a contributor, e.g. for basic things like formatting the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.
[ https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415358#comment-17415358 ] yangshengwei edited comment on KAFKA-13301 at 9/15/21, 9:59 AM: [https://kafka.apache.org/documentation/#upgrade_200_notable] !image-2021-09-15-15-37-25-561.png|width=521,height=180! !image-2021-09-15-15-39-00-179.png|width=518,height=564! was (Author: yangshengwei): https://kafka.apache.org/documentation/#upgrade_200_notable !image-2021-09-15-15-37-25-561.png|width=521,height=180! !image-2021-09-15-15-39-00-179.png|width=518,height=564! > The relationship between request.timeout. ms and max.poll.interval.ms in the > Consumer Configs is incorrect. > --- > > Key: KAFKA-13301 > URL: https://issues.apache.org/jira/browse/KAFKA-13301 > Project: Kafka > Issue Type: Improvement >Reporter: yangshengwei >Priority: Trivial > Attachments: image-2021-09-15-15-37-25-561.png, > image-2021-09-15-15-39-00-179.png > > > in Consumer Configs,The value of the configuration max.poll.interval.ms > always be larger than request.timeout.ms must . But here's what the official > document says: The value of the configuration request.timeout.ms must always > be larger than max.poll.interval.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.
[ https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415358#comment-17415358 ] yangshengwei edited comment on KAFKA-13301 at 9/15/21, 9:58 AM: https://kafka.apache.org/documentation/#upgrade_200_notable !image-2021-09-15-15-37-25-561.png|width=521,height=180! !image-2021-09-15-15-39-00-179.png|width=518,height=564! was (Author: yangshengwei): !image-2021-09-15-15-37-25-561.png|width=521,height=180! !image-2021-09-15-15-39-00-179.png|width=518,height=564! > The relationship between request.timeout. ms and max.poll.interval.ms in the > Consumer Configs is incorrect. > --- > > Key: KAFKA-13301 > URL: https://issues.apache.org/jira/browse/KAFKA-13301 > Project: Kafka > Issue Type: Improvement >Reporter: yangshengwei >Priority: Trivial > Attachments: image-2021-09-15-15-37-25-561.png, > image-2021-09-15-15-39-00-179.png > > > in Consumer Configs,The value of the configuration max.poll.interval.ms > always be larger than request.timeout.ms must . But here's what the official > document says: The value of the configuration request.timeout.ms must always > be larger than max.poll.interval.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13302) [IEP-59] Support not default page size
Nikolay Izhikov created KAFKA-13302: --- Summary: [IEP-59] Support not default page size Key: KAFKA-13302 URL: https://issues.apache.org/jira/browse/KAFKA-13302 Project: Kafka Issue Type: Improvement Reporter: Nikolay Izhikov Currently, CDC doesn't support not default page size. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13300) Kafka ACL Restriction Group Is not being applied
[ https://issues.apache.org/jira/browse/KAFKA-13300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415352#comment-17415352 ] Manikumar edited comment on KAFKA-13300 at 9/15/21, 7:43 AM: - kafka-acls.sh command {{"-add"}} option is for adding an acl and {{"-remove"}} is to remove an existing acl. Consuming from a group without read permission should fail unless we configure {{"allow.everyone.if.no.acl.found=true"}} [https://kafka.apache.org/documentation/#security_authz] I am not able to reproduce the issue. Can you attach the \{{ server.properties file}}, authorizer debug logs and steps to reproduce the issue. was (Author: omkreddy): kafka-acls.sh command {{"-add"}} option is for adding an acl and {{"-remove"}} is to remove an existing acl. Consuming from a group without read permission should fail unless we configure {{"allow.everyone.if.no.acl.found=true"}} [https://kafka.apache.org/documentation/#security_authz] I am not able to reproduce the issue. Can you attach the \{{ server.properties file}} and steps to reproduce the issue. > Kafka ACL Restriction Group Is not being applied > > > Key: KAFKA-13300 > URL: https://issues.apache.org/jira/browse/KAFKA-13300 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.2 >Reporter: Adriano Jesus >Priority: Minor > > Hi, > I am creating a KAFKA ACL with a fake group restriction as above: > > {code:java} > ./kafka-acls.sh \ > > --authorizer-properties zookeeper.connect=$ZOOKEEPER \ > --remove --allow-principal User:'Kafka-tools' \ > --consumer --group fake-group \ > --topic delete-me-2 > {code} > > When I try to consume a message with the same user, 'Kafka-tools', and with > another group I am still able to consume the messages: > {code:java} > // ./kafka-console-consumer.sh --bootstrap-server=$KAFKA --topic delete-me-2 > --consumer.config user-auth.properties --from-beginning --group teste > {code} > According to documentation this property can be used as consumer group > ([https://docs.confluent.io/platform/current/kafka/authorization.html):] > "*Group* > Groups in the brokers. All protocol calls that work with groups, such as > joining a group, must have corresponding privileges with the group in the > subject. Group ({{group.id}}) can mean Consumer Group, Stream Group > ({{application.id}}), Connect Worker Group, or any other group that uses the > Consumer Group protocol, like Schema Registry cluster." > I did another test adding a consumer act permission with this command: > {code:java} > ./kafka-acls.sh \ > > --authorizer-properties zookeeper.connect=$ZOOKEEPER \ > --add --allow-principal User:'Kafka-tools' \ > --consumer --group fake-group \ > --topic delete-me-2 > {code} > After that I removed the ACL authorization to READ operation for Group > resource. I tried again to consume from this topic. And still being able to > consume message from this topic even though without READ group permission. > Maybe my interpretation is wrong. But it seens that Kafka ACL is validating > the group permissions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.
[ https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415361#comment-17415361 ] yangshengwei commented on KAFKA-13301: -- [~guozhang] h3. [guozhangwang|https://github.com/guozhangwang] > The relationship between request.timeout. ms and max.poll.interval.ms in the > Consumer Configs is incorrect. > --- > > Key: KAFKA-13301 > URL: https://issues.apache.org/jira/browse/KAFKA-13301 > Project: Kafka > Issue Type: Improvement >Reporter: yangshengwei >Priority: Trivial > Attachments: image-2021-09-15-15-37-25-561.png, > image-2021-09-15-15-39-00-179.png > > > in Consumer Configs,The value of the configuration max.poll.interval.ms > always be larger than request.timeout.ms must . But here's what the official > document says: The value of the configuration request.timeout.ms must always > be larger than max.poll.interval.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.
[ https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415358#comment-17415358 ] yangshengwei edited comment on KAFKA-13301 at 9/15/21, 7:39 AM: !image-2021-09-15-15-37-25-561.png|width=521,height=180! !image-2021-09-15-15-39-00-179.png|width=518,height=564! was (Author: yangshengwei): !image-2021-09-15-15-37-25-561.png|width=521,height=180! > The relationship between request.timeout. ms and max.poll.interval.ms in the > Consumer Configs is incorrect. > --- > > Key: KAFKA-13301 > URL: https://issues.apache.org/jira/browse/KAFKA-13301 > Project: Kafka > Issue Type: Improvement >Reporter: yangshengwei >Priority: Trivial > Attachments: image-2021-09-15-15-37-25-561.png, > image-2021-09-15-15-39-00-179.png > > > in Consumer Configs,The value of the configuration max.poll.interval.ms > always be larger than request.timeout.ms must . But here's what the official > document says: The value of the configuration request.timeout.ms must always > be larger than max.poll.interval.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.
[ https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415358#comment-17415358 ] yangshengwei commented on KAFKA-13301: -- !image-2021-09-15-15-37-25-561.png|width=521,height=180! > The relationship between request.timeout. ms and max.poll.interval.ms in the > Consumer Configs is incorrect. > --- > > Key: KAFKA-13301 > URL: https://issues.apache.org/jira/browse/KAFKA-13301 > Project: Kafka > Issue Type: Improvement >Reporter: yangshengwei >Priority: Trivial > Attachments: image-2021-09-15-15-37-25-561.png > > > in Consumer Configs,The value of the configuration max.poll.interval.ms > always be larger than request.timeout.ms must . But here's what the official > document says: The value of the configuration request.timeout.ms must always > be larger than max.poll.interval.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.
[ https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yangshengwei updated KAFKA-13301: - Attachment: image-2021-09-15-15-37-25-561.png > The relationship between request.timeout. ms and max.poll.interval.ms in the > Consumer Configs is incorrect. > --- > > Key: KAFKA-13301 > URL: https://issues.apache.org/jira/browse/KAFKA-13301 > Project: Kafka > Issue Type: Improvement >Reporter: yangshengwei >Priority: Trivial > Attachments: image-2021-09-15-15-37-25-561.png > > > in Consumer Configs,The value of the configuration max.poll.interval.ms > always be larger than request.timeout.ms must . But here's what the official > document says: The value of the configuration request.timeout.ms must always > be larger than max.poll.interval.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.
yangshengwei created KAFKA-13301: Summary: The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect. Key: KAFKA-13301 URL: https://issues.apache.org/jira/browse/KAFKA-13301 Project: Kafka Issue Type: Improvement Reporter: yangshengwei in Consumer Configs,The value of the configuration max.poll.interval.ms always be larger than request.timeout.ms must . But here's what the official document says: The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13300) Kafka ACL Restriction Group Is not being applied
[ https://issues.apache.org/jira/browse/KAFKA-13300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415352#comment-17415352 ] Manikumar edited comment on KAFKA-13300 at 9/15/21, 7:28 AM: - kafka-acls.sh command {{"-add"}} option is for adding an acl and {{"-remove"}} is to remove an existing acl. Consuming from a group without read permission should fail unless we configure {{"allow.everyone.if.no.acl.found=true"}} [https://kafka.apache.org/documentation/#security_authz] I am not able to reproduce the issue. Can you attach the \{{ server.properties file}} and steps to reproduce the issue. was (Author: omkreddy): kafka-acls.sh command {{"--add"}} option is for adding an acl and {{"--remove"}} is to remove an existing acl. Consuming from a group without read permission should fail unless we configure {{"allow.everyone.if.no.acl.found=true"}} https://kafka.apache.org/documentation/#security_authz I am not able to reproduce the issue. Can you attach the{{ server.properties file}} and steps to reproduce the issue. > Kafka ACL Restriction Group Is not being applied > > > Key: KAFKA-13300 > URL: https://issues.apache.org/jira/browse/KAFKA-13300 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.2 >Reporter: Adriano Jesus >Priority: Minor > > Hi, > I am creating a KAFKA ACL with a fake group restriction as above: > > {code:java} > ./kafka-acls.sh \ > > --authorizer-properties zookeeper.connect=$ZOOKEEPER \ > --remove --allow-principal User:'Kafka-tools' \ > --consumer --group fake-group \ > --topic delete-me-2 > {code} > > When I try to consume a message with the same user, 'Kafka-tools', and with > another group I am still able to consume the messages: > {code:java} > // ./kafka-console-consumer.sh --bootstrap-server=$KAFKA --topic delete-me-2 > --consumer.config user-auth.properties --from-beginning --group teste > {code} > According to documentation this property can be used as consumer group > ([https://docs.confluent.io/platform/current/kafka/authorization.html):] > "*Group* > Groups in the brokers. All protocol calls that work with groups, such as > joining a group, must have corresponding privileges with the group in the > subject. Group ({{group.id}}) can mean Consumer Group, Stream Group > ({{application.id}}), Connect Worker Group, or any other group that uses the > Consumer Group protocol, like Schema Registry cluster." > I did another test adding a consumer act permission with this command: > {code:java} > ./kafka-acls.sh \ > > --authorizer-properties zookeeper.connect=$ZOOKEEPER \ > --add --allow-principal User:'Kafka-tools' \ > --consumer --group fake-group \ > --topic delete-me-2 > {code} > After that I removed the ACL authorization to READ operation for Group > resource. I tried again to consume from this topic. And still being able to > consume message from this topic even though without READ group permission. > Maybe my interpretation is wrong. But it seens that Kafka ACL is validating > the group permissions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13300) Kafka ACL Restriction Group Is not being applied
[ https://issues.apache.org/jira/browse/KAFKA-13300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415352#comment-17415352 ] Manikumar commented on KAFKA-13300: --- kafka-acls.sh command {{"--add"}} option is for adding an acl and {{"--remove"}} is to remove an existing acl. Consuming from a group without read permission should fail unless we configure {{"allow.everyone.if.no.acl.found=true"}} https://kafka.apache.org/documentation/#security_authz I am not able to reproduce the issue. Can you attach the{{ server.properties file}} and steps to reproduce the issue. > Kafka ACL Restriction Group Is not being applied > > > Key: KAFKA-13300 > URL: https://issues.apache.org/jira/browse/KAFKA-13300 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.2 >Reporter: Adriano Jesus >Priority: Minor > > Hi, > I am creating a KAFKA ACL with a fake group restriction as above: > > {code:java} > ./kafka-acls.sh \ > > --authorizer-properties zookeeper.connect=$ZOOKEEPER \ > --remove --allow-principal User:'Kafka-tools' \ > --consumer --group fake-group \ > --topic delete-me-2 > {code} > > When I try to consume a message with the same user, 'Kafka-tools', and with > another group I am still able to consume the messages: > {code:java} > // ./kafka-console-consumer.sh --bootstrap-server=$KAFKA --topic delete-me-2 > --consumer.config user-auth.properties --from-beginning --group teste > {code} > According to documentation this property can be used as consumer group > ([https://docs.confluent.io/platform/current/kafka/authorization.html):] > "*Group* > Groups in the brokers. All protocol calls that work with groups, such as > joining a group, must have corresponding privileges with the group in the > subject. Group ({{group.id}}) can mean Consumer Group, Stream Group > ({{application.id}}), Connect Worker Group, or any other group that uses the > Consumer Group protocol, like Schema Registry cluster." > I did another test adding a consumer act permission with this command: > {code:java} > ./kafka-acls.sh \ > > --authorizer-properties zookeeper.connect=$ZOOKEEPER \ > --add --allow-principal User:'Kafka-tools' \ > --consumer --group fake-group \ > --topic delete-me-2 > {code} > After that I removed the ACL authorization to READ operation for Group > resource. I tried again to consume from this topic. And still being able to > consume message from this topic even though without READ group permission. > Maybe my interpretation is wrong. But it seens that Kafka ACL is validating > the group permissions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415337#comment-17415337 ] Victoria Xia commented on KAFKA-13261: -- Hey [~vvcephei] [~abellemare] [~guozhang] I had a look at the code and it seems to support Adam's theory that the custom partitioners from the repartition() step aren't taken into account by the foreign key join. In particular, both the subscription sink topic and the response sink topic are created without partitioners specified in the StreamSinkNode: [https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1051] [https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1122] IIUC, this means the default partitioner is used for both topics despite the custom partitioners on the source tables, which explains the missing join results. One thing I don't understand: even if we fix this bug by propagating the partitioner information from the repartition() step to the foreign key join, wouldn't we still have an analogous bug if either of the topics for the source tables had custom partitioning logic created from outside Streams (i.e., without a repartition() step in the Streams topology)? In this case, Streams has no way of determining the partitioning of the source tables, which means we need an update to the interface for foreign key joins so that users can specify a partitioner to use in order to ensure copartitioning of the subscription and response topics with the relevant tables. Is this reasoning sound? If so, does it make sense to add logic into Streams to propagate information about the partitioner from the repartition() step to the foreign key join, or would it be better to require users to use the new interface to pass the same partitioner from the repartition() step(s) to the foreign key join as well? The latter seems more consistent with how copartitioning for joins is typically the user's responsibility, and also avoids the need to update Streams with logic for tracking partitioners throughout the topology. > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Assignee: Victoria Xia >Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized> > joinMaterialized(String name) { > Materialized> > table = Materialized.as(name);