[jira] [Commented] (KAFKA-6020) Broker side filtering
[ https://issues.apache.org/jira/browse/KAFKA-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17419571#comment-17419571 ] King Jin commented on KAFKA-6020: - How about broker side filtering by Kafka Headers? The consumer sent filtering header when request more messages from broker, the broker filtering message by header before response message back to consumer. > Broker side filtering > - > > Key: KAFKA-6020 > URL: https://issues.apache.org/jira/browse/KAFKA-6020 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Pavel Micka >Priority: Major > Labels: needs-kip > > Currently, it is not possible to filter messages on broker side. Filtering > messages on broker side is convenient for filter with very low selectivity > (one message in few thousands). In my case it means to transfer several GB of > data to consumer, throw it away, take one message and do it again... > While I understand that filtering by message body is not feasible (for > performance reasons), I propose to filter just by message key prefix. This > can be achieved even without any deserialization, as the prefix to be matched > can be passed as an array (hence the broker would do just array prefix > compare). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ccding commented on pull request #11345: Allow empty last segment to have missing offset index during recovery
ccding commented on pull request #11345: URL: https://github.com/apache/kafka/pull/11345#issuecomment-926250962 Failed tests are irrelevant and passed on my local run. ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOneWayReplicationWithAutoOffsetSync() 4.7 sec 1 Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers 56 sec 1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge
junrao commented on a change in pull request #11327: URL: https://github.com/apache/kafka/pull/11327#discussion_r715227056 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -512,6 +514,27 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition)) } } + + def maintainUncleanablePartitions(): Unit = { +// Remove deleted partitions from uncleanablePartitions +inLock(lock) { + // Note: we don't use retain or filterInPlace method in this function because retain is deprecated in + // scala 2.13 while filterInPlace is not available in scala 2.12. + + // Remove deleted partitions + uncleanablePartitions.values.foreach { +partitions => + val partitionsToRemove = partitions.filterNot(logs.contains(_)).toList + partitionsToRemove.foreach { partitions.remove(_) } + } + + // Remove entries with empty partition set. + val logDirsToRemove = uncleanablePartitions.filter { +case (logDir, partitions) => partitions.isEmpty Review comment: logDir is unused and can be replaced with _. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance
[ https://issues.apache.org/jira/browse/KAFKA-13070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-13070. - Resolution: Duplicate > LogManager shutdown races with periodic work scheduled by the instance > -- > > Key: KAFKA-13070 > URL: https://issues.apache.org/jira/browse/KAFKA-13070 > Project: Kafka > Issue Type: Bug >Reporter: Kowshik Prakasam >Assignee: Cong Ding >Priority: Major > > In the LogManager shutdown sequence (in LogManager.shutdown()), we don't > cancel the periodic work scheduled by it prior to shutdown. As a result, the > periodic work could race with the shutdown sequence causing some unwanted > side effects. This is reproducible by a unit test in LogManagerTest. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13315) log layer exception during shutdown that caused an unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-13315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-13315. - Fix Version/s: 3.0.1 3.1.0 Resolution: Fixed merged the PR to trunk and 3.0. > log layer exception during shutdown that caused an unclean shutdown > --- > > Key: KAFKA-13315 > URL: https://issues.apache.org/jira/browse/KAFKA-13315 > Project: Kafka > Issue Type: Bug >Reporter: Cong Ding >Assignee: Cong Ding >Priority: Major > Fix For: 3.1.0, 3.0.1 > > > We have seen an exception caused by shutting down scheduler before shutting > down LogManager. > When LogManager was closing partitons one by one, scheduler called to delete > old segments due to retention. However, the old segments could have been > closed by the LogManager, which subsequently marked logdir as offline and > didn't write the clean shutdown marker. Ultimately the broker would take > hours to restart. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown
junrao commented on pull request #11351: URL: https://github.com/apache/kafka/pull/11351#issuecomment-926239908 cherry-picked to 3.0 branch too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17419498#comment-17419498 ] A. Sophie Blee-Goldman commented on KAFKA-13272: [~guozhang] any updates here? I'm seeing almost weekly reports of Streams EOS apps getting stuck in restoration – a few of them I believe are actually https://issues.apache.org/jira/browse/KAFKA-13295, but most of them don't report any ProducerFencedException or TaskMigratedException which would rule that one out. > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that partition. > > In an attempt to recover, we restarted the stream app with stream > processing.guarantee back to at_least_once, than it proceed with reading the > changelog and restoring partition 8 fully. > But we noticed afterward, for the next hour until we rebuilt the system, that > partition 8 from command-expiry-store-changelog would not be > cleaned/compacted by the log cleaner/compacter compared to other partitions. > (could be unrelated, because we have seen that before) > So we resorted to delete/recreate our command-expiry-store-changelog topic > and events topic and regenerate it from the commands, reading from beginning. > Things went back to normal > h3. Attempt 2 at reproducing this issue > kstream runs with *exactly-once* > We force-deleted all 3 pod running kafka. > After that, one of the partition can’t be restored. (like reported in > previous attempt) > For that partition, we noticed these logs on the broker > {code:java} > [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: > Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, > command-expiry-store-changelog-9) while trying to send transaction markers > for commands-processor-0_9, these partitions are likely deleted already and > hence can be skipped > (kafka.coordinator.transaction.TransactionMarkerChannelManager){code} > Then > - we stop the kstream app, > - restarted kafka brokers cleanly > - Restarting the Kstream app, > Those logs messages showed up on the kstream app log: > > {code:java} > 2021-08-27 18:34:42,413 INFO [Consumer > clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer, > groupId=commands-processor] The following partitions still have unstable > offsets which are not cleared on the broker side: [commands-9], this could be > either transactional offsets waiting for completion, or normal offsets > waiting for replication after appending to local log > [commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1] >
[GitHub] [kafka] junrao merged pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown
junrao merged pull request #11351: URL: https://github.com/apache/kafka/pull/11351 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vincent81jiang commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge
vincent81jiang commented on a change in pull request #11327: URL: https://github.com/apache/kafka/pull/11327#discussion_r715169127 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -512,6 +514,27 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition)) } } + + def maintainUncleanablePartitions(): Unit = { +// Remove deleted partitions from uncleanablePartitions +inLock(lock) { + // Remove non-existing logDir + // Note: we don't use retain or filterInPlace method in this function because retain is deprecated in + // scala 2.13 while filterInPlace is not available in scala 2.12. + val logDirsToRemove = uncleanablePartitions.filterNot { +case (logDir, _) => logDirs.map(_.getAbsolutePath).contains(logDir) + }.keys.toList + logDirsToRemove.foreach { uncleanablePartitions.remove(_) } + + uncleanablePartitions.values.foreach { +partitions => + // Remove deleted partitions + val partitionsToRemove = partitions.filterNot(logs.contains(_)).toList + partitionsToRemove.foreach { partitions.remove(_) } Review comment: Yes, agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vincent81jiang commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge
vincent81jiang commented on a change in pull request #11327: URL: https://github.com/apache/kafka/pull/11327#discussion_r715168688 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -512,6 +514,27 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition)) } } + + def maintainUncleanablePartitions(): Unit = { +// Remove deleted partitions from uncleanablePartitions +inLock(lock) { + // Remove non-existing logDir + // Note: we don't use retain or filterInPlace method in this function because retain is deprecated in + // scala 2.13 while filterInPlace is not available in scala 2.12. + val logDirsToRemove = uncleanablePartitions.filterNot { +case (logDir, _) => logDirs.map(_.getAbsolutePath).contains(logDir) + }.keys.toList + logDirsToRemove.foreach { uncleanablePartitions.remove(_) } Review comment: make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown
ccding commented on a change in pull request #11351: URL: https://github.com/apache/kafka/pull/11351#discussion_r71507 ## File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala ## @@ -141,9 +145,14 @@ class KafkaScheduler(val threads: Int, executor != null } } - - private def ensureRunning(): Unit = { -if (!isStarted) - throw new IllegalStateException("Kafka scheduler is not running.") - } +} + +private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] { + override def cancel(mayInterruptIfRunning: Boolean): Boolean = true + override def isCancelled: Boolean = true + override def isDone: Boolean = true + override def get(): Unit = {} + override def get(timeout: Long, unit: TimeUnit): Unit = {} + override def getDelay(unit: TimeUnit): Long = 0 + override def compareTo(o: Delayed): Int = 0 Review comment: NoOpScheduledFutureTask extends ScheduledFuture, ScheduledFuture extends Delayed, Delayed extends Comparable\: therefore we should use Delayed here. Also, it doesn't compile if I changed it to ScheduledFuture Fixed the return value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown
ccding commented on a change in pull request #11351: URL: https://github.com/apache/kafka/pull/11351#discussion_r71507 ## File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala ## @@ -141,9 +145,14 @@ class KafkaScheduler(val threads: Int, executor != null } } - - private def ensureRunning(): Unit = { -if (!isStarted) - throw new IllegalStateException("Kafka scheduler is not running.") - } +} + +private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] { + override def cancel(mayInterruptIfRunning: Boolean): Boolean = true + override def isCancelled: Boolean = true + override def isDone: Boolean = true + override def get(): Unit = {} + override def get(timeout: Long, unit: TimeUnit): Unit = {} + override def getDelay(unit: TimeUnit): Long = 0 + override def compareTo(o: Delayed): Int = 0 Review comment: NoOpScheduledFutureTask extends ScheduledFuture, ScheduledFuture extends Delayed, Delayed extends Comparable: therefore we should use Delayed here. Also, it doesn't compile if I changed it to ScheduledFuture Fixed the return value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown
junrao commented on a change in pull request #11351: URL: https://github.com/apache/kafka/pull/11351#discussion_r715068733 ## File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala ## @@ -141,9 +145,14 @@ class KafkaScheduler(val threads: Int, executor != null } } - - private def ensureRunning(): Unit = { -if (!isStarted) - throw new IllegalStateException("Kafka scheduler is not running.") - } +} + +private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] { + override def cancel(mayInterruptIfRunning: Boolean): Boolean = true + override def isCancelled: Boolean = true + override def isDone: Boolean = true + override def get(): Unit = {} + override def get(timeout: Long, unit: TimeUnit): Unit = {} + override def getDelay(unit: TimeUnit): Long = 0 + override def compareTo(o: Delayed): Int = 0 Review comment: Should Delayed be ScheduledFuture? Also, instead of always returning 0, it seems that it's better to return 0 if the other instance is NoOpScheduledFutureTask and return -1 or 1 otherwise? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a change in pull request #11356: [KAFKA-10539] Convert KStreamImpl joins to new PAPI
jeqo commented on a change in pull request #11356: URL: https://github.com/apache/kafka/pull/11356#discussion_r715015262 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -200,11 +217,11 @@ private void emitNonJoinedOuterRecords(final KeyValueStore
[GitHub] [kafka] jeqo opened a new pull request #11356: [KAFKA-10539] Convert KStreamImpl joins to new PAPI
jeqo opened a new pull request #11356: URL: https://github.com/apache/kafka/pull/11356 As part of the migration to new Processor API, this PR converts KStream to KStream joins. Depends #11315 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13321) Notify listener of leader change on registration
Jose Armando Garcia Sancio created KAFKA-13321: -- Summary: Notify listener of leader change on registration Key: KAFKA-13321 URL: https://issues.apache.org/jira/browse/KAFKA-13321 Project: Kafka Issue Type: Sub-task Components: kraft Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio When a Listener is registered with the RaftClient, the RaftClient doesn't notify the listener of the current leader when it is an follower. The current implementation of RaftClient notifies this listener of the leader change if it is the current leader and it has caught up to the leader epoch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #11354: MINOR: Print lastTimestamp when dumping producer snapshots
hachikuji merged pull request #11354: URL: https://github.com/apache/kafka/pull/11354 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #11347: KAFKA-13296: warn if previous assignment has duplicate partitions
wcarlson5 commented on a change in pull request #11347: URL: https://github.com/apache/kafka/pull/11347#discussion_r714990115 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -351,6 +351,17 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // add the consumer and any info in its subscription to the client clientMetadata.addConsumer(consumerId, subscription.ownedPartitions()); +if (allOwnedPartitions.stream().anyMatch(t -> subscription.ownedPartitions().contains(t))) { +log.warn("The previous assignment contains a partition more than once. " + +"This might result in violation of EOS if enabled. \n" + Review comment: that is probably easier to read. I will do that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #11347: KAFKA-13296: warn if previous assignment has duplicate partitions
wcarlson5 commented on a change in pull request #11347: URL: https://github.com/apache/kafka/pull/11347#discussion_r714987990 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -351,6 +351,17 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // add the consumer and any info in its subscription to the client clientMetadata.addConsumer(consumerId, subscription.ownedPartitions()); +if (allOwnedPartitions.stream().anyMatch(t -> subscription.ownedPartitions().contains(t))) { Review comment: Sure that works -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.
satishd commented on pull request #11058: URL: https://github.com/apache/kafka/pull/11058#issuecomment-925983449 Thanks @ccding for the review, addressed with inline replies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.
satishd commented on pull request #11058: URL: https://github.com/apache/kafka/pull/11058#issuecomment-925982858 Thanks @junrao for the review. Addressed them with the latest commit and comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke
satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r714976848 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * This class represents the remote log data snapshot stored in a file for a specific topic partition. This is used by + * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata received for a specific partition from + * remote log metadata topic. This will avoid reading the remote log metadata messages from the topic again when a + * broker restarts. + */ +public class RemoteLogMetadataSnapshotFile { +private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class); + +public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = "remote_log_snapshot"; + +// header: +// size: 2 + (8+8) + 4 + 8 = 30 +private static final int HEADER_SIZE = 30; + +private final File metadataStoreFile; + +/** + * Creates a CommittedLogMetadataSnapshotFile instance backed by a file with the name `remote_log_snapshot` in + * the given {@code metadataStoreDir}. It creates the file if it does not exist. + * + * @param metadataStoreDir directory in which the snapshot file to be created. + */ +RemoteLogMetadataSnapshotFile(Path metadataStoreDir) { +this.metadataStoreFile = new File(metadataStoreDir.toFile(), COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME); + +// Create an empty file if it does not exist. +try { +boolean newFileCreated = metadataStoreFile.createNewFile(); +log.info("Remote log metadata snapshot file: [{}], newFileCreated: [{}]", metadataStoreFile, newFileCreated); +} catch (IOException e) { +throw new KafkaException(e); +} +} + +/** + * Writes the given snapshot replacing the earlier snapshot data. + * + * @param snapshot Snapshot to be stored. + * @throws IOException if there4 is any error in writing the given snapshot to the file. + */ +public synchronized void write(Snapshot snapshot) throws IOException { +File newMetadataSnapshotFile = new File(metadataStoreFile.getAbsolutePath() + ".new"); +try (WritableByteChannel fileChannel = Channels.newChannel(new FileOutputStream(newMetadataSnapshotFile))) { + +ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE); + +// Write version +headerBuffer.putShort(snapshot.version); + +// Write topic-id +headerBuffer.putLong(snapshot.topicId.getMostSignificantBits()); +headerBuffer.putLong(snapshot.topicId.getLeastSignificantBits()); + +// Write metadata partition and metadata partition offset +headerBuffer.putInt(snapshot.metadataPartition); +headerBuffer.putLong(snapshot.metadataPartitionOffset); +headerBuffer.flip(); + +// Write header +fileChannel.write(headerBuffer); + +// Write each entry +ByteBuffer lenBuffer = ByteBuffer.allocate(4); +RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); +
[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke
satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r714976377 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java ## @@ -161,53 +161,46 @@ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metada throw new IllegalArgumentException("metadataUpdate: " + metadataUpdate + " with state " + RemoteLogSegmentState.COPY_SEGMENT_STARTED + " can not be updated"); case COPY_SEGMENT_FINISHED: -handleSegmentWithCopySegmentFinishedState(metadataUpdate, existingMetadata); + handleSegmentWithCopySegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate)); break; case DELETE_SEGMENT_STARTED: -handleSegmentWithDeleteSegmentStartedState(metadataUpdate, existingMetadata); + handleSegmentWithDeleteSegmentStartedState(existingMetadata.createWithUpdates(metadataUpdate)); break; case DELETE_SEGMENT_FINISHED: -handleSegmentWithDeleteSegmentFinishedState(metadataUpdate, existingMetadata); + handleSegmentWithDeleteSegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate)); break; default: throw new IllegalArgumentException("Metadata with the state " + targetState + " is not supported"); } } -private void handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadataUpdate metadataUpdate, - RemoteLogSegmentMetadata existingMetadata) { -log.debug("Adding remote log segment metadata to leader epoch mappings with update: [{}]", metadataUpdate); - -doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata, +protected final void handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { Review comment: Good catch. We should maintain the ordering while storing the segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke
satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r714975955 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * This class represents the remote log data snapshot stored in a file for a specific topic partition. This is used by + * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata received for a specific partition from + * remote log metadata topic. This will avoid reading the remote log metadata messages from the topic again when a + * broker restarts. + */ +public class RemoteLogMetadataSnapshotFile { +private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class); + +public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = "remote_log_snapshot"; + +// header: Review comment: This was suggested in KIP review to add this as it will be useful for debugging. To make sure that, what is stored in the file and the received topic id should be the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke
satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r714975590 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ## @@ -39,6 +41,7 @@ private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey(); private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey(); private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); +private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = new RemoteLogSegmentMetadataSnapshotRecord().apiKey(); Review comment: There are two ways to implement it. - Have a separate BytesApiMessageSerde for this message only and write the supporting classes which will be similar to RemoteLogMetadataSerde and RemoteLogSegmentMetadataSnapshotTransform. - This is one more ApiMessage about representing remote log metadata. Add to the existing RemoteLogMetadatSerde which has the framework to add one more api message. I choose the latter for simplicity. We can update the javadoc to describe that RemoteLogMetadatSerde includes serde for all the APIMessage defined for remote log metadata including RemoteLogSegmentMetadataSnapshot. It can be used as serde for the topic as it supports all the messages stored in the remote log metadata topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke
satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r714975169 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -120,6 +174,35 @@ public void run() { } } +private void maybeSyncCommittedDataAndOffsets(boolean forceSync) { +boolean noOffsetUpdates = committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets); +if (noOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) { +log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, forceSync: {}", noOffsetUpdates, forceSync); +return; +} + +try { +HashMap syncedPartitionToConsumedOffsets = new HashMap<>(); +for (TopicIdPartition topicIdPartition : assignedTopicPartitions) { +int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition); +Long offset = partitionToConsumedOffsets.get(metadataPartition); +if (offset != null && !offset.equals(committedPartitionToConsumedOffsets.get(metadataPartition))) { + remotePartitionMetadataEventHandler.syncLogMetadataDataFile(topicIdPartition, metadataPartition, offset); +syncedPartitionToConsumedOffsets.put(metadataPartition, offset); +} else { +log.debug("Skipping syncup of the remote-log-metadata-file for partition:{} , with remote log metadata partition{}, and offset:{} ", +topicIdPartition, metadataPartition, offset); +} +} + +committedOffsetsFile.writeEntries(partitionToConsumedOffsets); Review comment: We needed partitionToConsumedOffsets in the earlier check and we do not really need syncedPartitionToConsumedOffsets here. Updated to use partitionToConsumedOffsets for writing and setting committedPartitionToConsumedOffsets as partitionToConsumedOffsets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke
satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r714974207 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -120,6 +174,35 @@ public void run() { } } +private void maybeSyncCommittedDataAndOffsets(boolean forceSync) { +boolean noOffsetUpdates = committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets); +if (noOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) { +log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, forceSync: {}", noOffsetUpdates, forceSync); +return; +} + +try { +HashMap syncedPartitionToConsumedOffsets = new HashMap<>(); +for (TopicIdPartition topicIdPartition : assignedTopicPartitions) { Review comment: Good point. Added the locking and updated it with a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke
satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r714973902 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -85,21 +90,68 @@ // Map of remote log metadata topic partition to consumed offsets. private final Map partitionToConsumedOffsets = new ConcurrentHashMap<>(); +private Map committedPartitionToConsumedOffsets = Collections.emptyMap(); + +private final long committedOffsetSyncIntervalMs; +private CommittedOffsetsFile committedOffsetsFile; +private long lastSyncedTimeMs; + public ConsumerTask(KafkaConsumer consumer, RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, -RemoteLogMetadataTopicPartitioner topicPartitioner) { -Objects.requireNonNull(consumer); -Objects.requireNonNull(remotePartitionMetadataEventHandler); -Objects.requireNonNull(topicPartitioner); - -this.consumer = consumer; -this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler; -this.topicPartitioner = topicPartitioner; +RemoteLogMetadataTopicPartitioner topicPartitioner, +Path committedOffsetsPath, +Time time, +long committedOffsetSyncIntervalMs) { +this.consumer = Objects.requireNonNull(consumer); +this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); +this.topicPartitioner = Objects.requireNonNull(topicPartitioner); +this.time = Objects.requireNonNull(time); +this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs; + +initializeConsumerAssignment(committedOffsetsPath); +} + +private void initializeConsumerAssignment(Path committedOffsetsPath) { +try { +committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); +} catch (IOException e) { +throw new KafkaException(e); +} + +Map committedOffsets = Collections.emptyMap(); +try { +// Load committed offset and assign them in the consumer. +committedOffsets = committedOffsetsFile.readEntries(); +} catch (IOException e) { +// Ignore the error and consumer consumes from the earliest offset. +log.error("Encountered error while building committed offsets from the file", e); +} + +final Set> entries = committedOffsets.entrySet(); + +if (!entries.isEmpty()) { +// Assign topic partitions from the earlier committed offsets file. +Set earlierAssignedPartitions = committedOffsets.keySet(); +assignedMetaPartitions = Collections.unmodifiableSet(earlierAssignedPartitions); +Set metadataTopicPartitions = earlierAssignedPartitions.stream() + .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x)) + .collect(Collectors.toSet()); +consumer.assign(metadataTopicPartitions); + +// Seek to the committed offsets +for (Map.Entry entry : entries) { +partitionToConsumedOffsets.put(entry.getKey(), entry.getValue()); +consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue()); +} + +committedPartitionToConsumedOffsets = committedOffsets; +} } @Override public void run() { log.info("Started Consumer task thread."); +lastSyncedTimeMs = time.milliseconds(); Review comment: I guess the consumer would fetch from the earliest offset as there is no state associated with in the consumer for those partitions as `auto.offset.reset` is set as `earliest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke
satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r714972637 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -85,21 +90,68 @@ // Map of remote log metadata topic partition to consumed offsets. private final Map partitionToConsumedOffsets = new ConcurrentHashMap<>(); +private Map committedPartitionToConsumedOffsets = Collections.emptyMap(); + +private final long committedOffsetSyncIntervalMs; +private CommittedOffsetsFile committedOffsetsFile; +private long lastSyncedTimeMs; + public ConsumerTask(KafkaConsumer consumer, RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, -RemoteLogMetadataTopicPartitioner topicPartitioner) { -Objects.requireNonNull(consumer); -Objects.requireNonNull(remotePartitionMetadataEventHandler); -Objects.requireNonNull(topicPartitioner); - -this.consumer = consumer; -this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler; -this.topicPartitioner = topicPartitioner; +RemoteLogMetadataTopicPartitioner topicPartitioner, +Path committedOffsetsPath, +Time time, +long committedOffsetSyncIntervalMs) { +this.consumer = Objects.requireNonNull(consumer); +this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); +this.topicPartitioner = Objects.requireNonNull(topicPartitioner); +this.time = Objects.requireNonNull(time); +this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs; + +initializeConsumerAssignment(committedOffsetsPath); +} + +private void initializeConsumerAssignment(Path committedOffsetsPath) { +try { +committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); +} catch (IOException e) { +throw new KafkaException(e); +} + +Map committedOffsets = Collections.emptyMap(); +try { +// Load committed offset and assign them in the consumer. +committedOffsets = committedOffsetsFile.readEntries(); +} catch (IOException e) { +// Ignore the error and consumer consumes from the earliest offset. +log.error("Encountered error while building committed offsets from the file", e); +} + +final Set> entries = committedOffsets.entrySet(); Review comment: We want to go through the entries instead of calling get(key) with all the keys avoiding the lookups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke
satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r714971600 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java ## @@ -49,21 +52,23 @@ public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig, RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, - RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner, + RemoteLogMetadataTopicPartitioner topicPartitioner, Time time) { this.rlmmConfig = rlmmConfig; this.time = time; //Create a task to consume messages and submit the respective events to RemotePartitionMetadataEventHandler. KafkaConsumer consumer = new KafkaConsumer<>(rlmmConfig.consumerProperties()); -consumerTask = new ConsumerTask(consumer, remotePartitionMetadataEventHandler, rlmmTopicPartitioner); +Path committedOffsetsPath = new File(rlmmConfig.logDir(), COMMITTED_OFFSETS_FILE_NAME).toPath(); +consumerTask = new ConsumerTask(consumer, remotePartitionMetadataEventHandler, topicPartitioner, committedOffsetsPath, time, 60_000L); Review comment: Right, this will be added in a followup PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke
satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r714971397 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Pattern; + +/** + * This class represents a file containing the committed offsets of remote log metadata partitions. + */ +public class CommittedOffsetsFile { +private static final int CURRENT_VERSION = 0; +private static final String SEPARATOR = " "; + +private static final Pattern MINIMUM_ONE_WHITESPACE = Pattern.compile("\\s+"); +private final CheckpointFile> checkpointFile; + +CommittedOffsetsFile(File offsetsFile) throws IOException { +CheckpointFile.EntryFormatter> formatter = new EntryFormatter(); +checkpointFile = new CheckpointFile<>(offsetsFile, CURRENT_VERSION, formatter); +} + +private static class EntryFormatter implements CheckpointFile.EntryFormatter> { + +@Override +public String toString(Map.Entry entry) { +// Each entry is stored in a new line as +return entry.getKey() + SEPARATOR + entry.getValue(); +} + +@Override +public Optional> fromString(String line) { +String[] strings = MINIMUM_ONE_WHITESPACE.split(line); +if (strings.length != 2) { +return Optional.empty(); +} +int partition = Integer.parseInt(strings[0]); +long offset = Long.parseLong(strings[1]); Review comment: An error will be thrown to the caller. As you suggested, it is good to catch that and return empty optional so that the caller throws an error with the details including the line. Addressed it in the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714947277 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -929,6 +929,29 @@ class AbstractFetcherThreadTest { fetcher.verifyLastFetchedEpoch(partition, Some(5)) } + @Test + def testMaybeUpdateTopicIds(): Unit = { +val partition = new TopicPartition("topic1", 0) +val fetcher = new MockFetcherThread + +// Start with no topic IDs +fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) +fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0))) + + +def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) +} + +verifyFetchState(fetcher.fetchState(partition), None) + +// Add topic ID +fetcher.maybeUpdateTopicIds(Set(partition), topicName => topicIds.get(topicName)) Review comment: Ok. Seems like we agree then. I think I have a test in ReplicaManager that does this as well. So we are good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714936472 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -929,6 +929,29 @@ class AbstractFetcherThreadTest { fetcher.verifyLastFetchedEpoch(partition, Some(5)) } + @Test + def testMaybeUpdateTopicIds(): Unit = { +val partition = new TopicPartition("topic1", 0) +val fetcher = new MockFetcherThread + +// Start with no topic IDs +fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) +fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0))) + + +def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) +} + +verifyFetchState(fetcher.fetchState(partition), None) + +// Add topic ID +fetcher.maybeUpdateTopicIds(Set(partition), topicName => topicIds.get(topicName)) Review comment: We could add tests for the ReplicaManager which verifies that the topic id is updated and propagated to the fetcher manager. Is it what you are thinking about? Regarding this particular test, what would you add? If the topic id is zero, we would just set it. As you said, the logic which prevents this from happening is before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714923995 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -3469,4 +3472,88 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], + tp: TopicPartition, + expectedTopicId: Option[Uuid]): Unit = { +val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) +assertTrue(fetchState.isDefined) +assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { +val aliveBrokersIds = Seq(0, 1) +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) +try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse1.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = { +val version = if (usesTopicIds) LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort +val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID Review comment: Ah.. It is because `topicId` is also used in the statement. You could indeed use `this.topicId` there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714910220 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -929,6 +929,29 @@ class AbstractFetcherThreadTest { fetcher.verifyLastFetchedEpoch(partition, Some(5)) } + @Test + def testMaybeUpdateTopicIds(): Unit = { +val partition = new TopicPartition("topic1", 0) +val fetcher = new MockFetcherThread + +// Start with no topic IDs +fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) +fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0))) + + +def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) +} + +verifyFetchState(fetcher.fetchState(partition), None) + +// Add topic ID +fetcher.maybeUpdateTopicIds(Set(partition), topicName => topicIds.get(topicName)) Review comment: To clarify, this path is only taken if ``` requestTopicId match { case Some(topicId) if logTopicId.isEmpty => ``` meaning both the request topic ID and log topic ID are non-empty/non-zero UUID. So just to clarify, should I still check these edge cases (either log ID is already defined/ID in the map is zero) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714901210 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -3469,4 +3472,88 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], + tp: TopicPartition, + expectedTopicId: Option[Uuid]): Unit = { +val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) +assertTrue(fetchState.isDefined) +assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { +val aliveBrokersIds = Seq(0, 1) +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) +try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse1.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = { +val version = if (usesTopicIds) LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort +val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID Review comment: I had this and scala told me it was a recursive definition. Maybe I can use something like this.topicId in the definition to clarify the different topic ID usages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714899137 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { +partitionMapLock.lockInterruptibly() +try { + partitions.foreach { tp => +val currentState = partitionStates.stateValue(tp) +if (currentState != null) { + val updatedState = currentState.updateTopicId(topicIds(tp.topic)) + partitionStates.updateAndMoveToEnd(tp, updatedState) Review comment: I will do this. For some reason I thought put was like the queue/list data structure and not map. This makes sense to me now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13320) Suggestion: SMT support for null key/value should be documented
[ https://issues.apache.org/jira/browse/KAFKA-13320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Ellis updated KAFKA-13320: -- Description: While working with a JDBC Sink Connector, I noticed that some SMT choke on a tombstone (null value) while others handle tombstones fine. For example: {code:javascript} "transforms": "flattenKey,valueToJSON,wrapValue,addTimestamp", "transforms.flattenKey.type": "org.apache.kafka.connect.transforms.Flatten$Key", "transforms.flattenKey.delimiter": "_", "transforms.valueToJSON.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", "transforms.valueToJSON.schemas.enable": "false", "transforms.valueToJSON.predicate": "tombstone", "transforms.valueToJSON.negate": true, "transforms.wrapValue.type":"org.apache.kafka.connect.transforms.HoistField$Value", "transforms.wrapValue.field":"matrix", "transforms.wrapValue.predicate": "tombstone", "transforms.wrapValue.negate": true, "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addTimestamp.timestamp.field": "message_timestamp", "predicates": "tombstone", "predicates.tombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone" {code} To avoid the cryptic error “java.lang.ClassCastException: class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct” when processing a tombstone record, I had to add a negated predicate of RecordIsTombstone for ToJSON (community SMT) and HoistField, but did not need to add that to InsertField. Digging in the source, I find that InsertField handles the case where key or value is null: [https://github.com/a0x8o/kafka/blob/f8237749f6ad34c09154f807e53273be64e1261e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L130] ^ Thanks to this, there's no need to add a predicate to skip InsertField$Value when value is null. It would help if the docs listed how the individual SMTs behave when dealing with a null key/value. Of course we can always find this out by trial and error or by studying the source code. But if we were to make a best practice of describing how an SMT handles null key/value, that would have two benefits: 1) Save developers time when working with the official (shipped with Kafka) SMT 2) Inspire developers who write their own SMT to likewise document how they handle null key/value Perhaps a standard way of dealing with nulls ("no-op if key/value is null") could be promoted, and SMT authors would only need to document their behavior when it differs. was: While working with a JDBC Sink Connector, I noticed that some SMT choke on a tombstone (null value) while others handle tombstones fine. For example: {code:javascript} "transforms": "flattenKey,valueToJSON,wrapValue,addTimestamp", "transforms.flattenKey.type": "org.apache.kafka.connect.transforms.Flatten$Key", "transforms.flattenKey.delimiter": "_", "transforms.valueToJSON.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", "transforms.valueToJSON.schemas.enable": "false", "transforms.valueToJSON.predicate": "tombstone", "transforms.valueToJSON.negate": true, "transforms.wrapValue.type":"org.apache.kafka.connect.transforms.HoistField$Value", "transforms.wrapValue.field":"matrix", "transforms.wrapValue.predicate": "tombstone", "transforms.wrapValue.negate": true, "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addTimestamp.timestamp.field": "message_timestamp", "predicates": "tombstone", "predicates.tombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone" {code} To avoid the cryptic error “java.lang.ClassCastException: class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct” when processing a tombstone record, I had to add a negated predicate of RecordIsTombstone for ToJSON (community SMT) and HoistField, but did not need to add that to InsertField. Digging in the source, I find that InsertField handles the case where key or value is null: https://github.com/a0x8o/kafka/blob/f8237749f6ad34c09154f807e53273be64e1261e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L130 ^ Thanks to this, there's no need to add a predicate to skip InsertField$Value when value is null. It would help if the docs listed how the individual SMTs behave when dealing with a null key/value. Of course we can always find this out by trial and error or by studying the source code. But if we were to make a best practice of describing how an SMT handles null key/value, that would have two benefits: 1) Save developers time when working with the official (shipped with Kafka) SMT 2) Inspire developers who write their own SMT to likewise document how they handle null key/value Perhaps a standard way of dealing
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714815080 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -862,6 +877,10 @@ case class PartitionFetchState(fetchOffset: Long, s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" + Review comment: Could we add the topic id to the `toString`? ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -3469,4 +3472,88 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], + tp: TopicPartition, + expectedTopicId: Option[Uuid]): Unit = { +val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) +assertTrue(fetchState.isDefined) +assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { +val aliveBrokersIds = Seq(0, 1) +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) +try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse1.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = { +val version = if (usesTopicIds) LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort +val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID Review comment: nit: `topicId` instead of `topicIdRaw`? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { +partitionMapLock.lockInterruptibly() +try { + partitions.foreach { tp => +val currentState = partitionStates.stateValue(tp) +if (currentState != null) { + val updatedState = currentState.updateTopicId(topicIds(tp.topic)) + partitionStates.updateAndMoveToEnd(tp, updatedState) Review comment: We need to address this point. ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Updating log for $topicPartition to assign topic ID " + s"$topicId from LeaderAndIsr request from controller $controllerId with correlation " + s"id $correlationId epoch $controllerEpoch") +if (partitionState.leader != localBrokerId && metadataCache.hasAliveBroker(partitionState.leader)) Review comment: We need to address this point. ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -929,6 +929,29 @@ class AbstractFetcherThreadTest { fetcher.verifyLastFetchedEpoch(partition, Some(5)) } + @Test + def testMaybeUpdateTopicIds(): Unit = { +val partition = new TopicPartition("topic1", 0) +val fetcher = new MockFetcherThread + +// Start with no topic IDs +fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) +fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0))) + + +def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) +} + +verifyFetchState(fetcher.fetchState(partition), None) + +// Add topic ID +fetcher.maybeUpdateTopicIds(Set(partition), topicName => topicIds.get(topicName)) Review comment: Right. We should get here if the partition has
[jira] [Updated] (KAFKA-13320) Suggestion: SMT support for null key/value should be documented
[ https://issues.apache.org/jira/browse/KAFKA-13320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Ellis updated KAFKA-13320: -- Description: While working with a JDBC Sink Connector, I noticed that some SMT choke on a tombstone (null value) while others handle tombstones fine. For example: {code:javascript} "transforms": "flattenKey,valueToJSON,wrapValue,addTimestamp", "transforms.flattenKey.type": "org.apache.kafka.connect.transforms.Flatten$Key", "transforms.flattenKey.delimiter": "_", "transforms.valueToJSON.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", "transforms.valueToJSON.schemas.enable": "false", "transforms.valueToJSON.predicate": "tombstone", "transforms.valueToJSON.negate": true, "transforms.wrapValue.type":"org.apache.kafka.connect.transforms.HoistField$Value", "transforms.wrapValue.field":"matrix", "transforms.wrapValue.predicate": "tombstone", "transforms.wrapValue.negate": true, "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addTimestamp.timestamp.field": "message_timestamp", "predicates": "tombstone", "predicates.tombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone" {code} To avoid the cryptic error “java.lang.ClassCastException: class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct” when processing a tombstone record, I had to add a negated predicate of RecordIsTombstone for ToJSON (community SMT) and HoistField, but did not need to add that to InsertField. Digging in the source, I find that InsertField handles the case where key or value is null: https://github.com/a0x8o/kafka/blob/f8237749f6ad34c09154f807e53273be64e1261e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L130 ^ Thanks to this, there's no need to add a predicate to skip InsertField$Value when value is null. It would help if the docs listed how the individual SMTs behave when dealing with a null key/value. Of course we can always find this out by trial and error or by studying the source code. But if we were to make a best practice of describing how an SMT handles null key/value, that would have two benefits: 1) Save developers time when working with the official (shipped with Kafka) SMT 2) Inspire developers who write their own SMT to likewise document how they handle null key/value Perhaps a standard way of dealing with nulls ("no-op if key/value is null") could be promoted, and SMT authors would only need to document their behavior when it differs. was: While working with a JDBC Sink Connector, I noticed that some SMT choke on a tombstone (null value) while others handle tombstones fine. For example: ``` "transforms": "flattenKey,valueToJSON,wrapValue,addTimestamp", "transforms.flattenKey.type": "org.apache.kafka.connect.transforms.Flatten$Key", "transforms.flattenKey.delimiter": "_", "transforms.valueToJSON.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", "transforms.valueToJSON.schemas.enable": "false", "transforms.valueToJSON.predicate": "tombstone", "transforms.valueToJSON.negate": true, "transforms.wrapValue.type":"org.apache.kafka.connect.transforms.HoistField$Value", "transforms.wrapValue.field":"matrix", "transforms.wrapValue.predicate": "tombstone", "transforms.wrapValue.negate": true, "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addTimestamp.timestamp.field": "message_timestamp", "predicates": "tombstone", "predicates.tombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone" ``` To avoid the cryptic error “java.lang.ClassCastException: class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct” when processing a tombstone record, I had to add a negated predicate of RecordIsTombstone for ToJSON (community SMT) and HoistField, but did not need to add that to InsertField. Digging in the source, I find that InsertField handles the case where key or value is null: https://github.com/a0x8o/kafka/blob/f8237749f6ad34c09154f807e53273be64e1261e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L130 ^ Thanks to this, there's no need to add a predicate to skip InsertField$Value when value is null. It would help if the docs listed how the individual SMTs behave when dealing with a null key/value. Of course we can always find this out by trial and error or by studying the source code. But if we were to make a best practice of describing how an SMT handles null key/value, that would have two benefits: 1) Save developers time when working with the official (shipped with Kafka) SMT 2) Inspire developers who write their own SMT to likewise document how they handle null key/value Perhaps a standard way of dealing with nulls ("no-op
[jira] [Created] (KAFKA-13320) Suggestion: SMT support for null key/value should be documented
Ben Ellis created KAFKA-13320: - Summary: Suggestion: SMT support for null key/value should be documented Key: KAFKA-13320 URL: https://issues.apache.org/jira/browse/KAFKA-13320 Project: Kafka Issue Type: Wish Components: KafkaConnect Reporter: Ben Ellis While working with a JDBC Sink Connector, I noticed that some SMT choke on a tombstone (null value) while others handle tombstones fine. For example: ``` "transforms": "flattenKey,valueToJSON,wrapValue,addTimestamp", "transforms.flattenKey.type": "org.apache.kafka.connect.transforms.Flatten$Key", "transforms.flattenKey.delimiter": "_", "transforms.valueToJSON.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", "transforms.valueToJSON.schemas.enable": "false", "transforms.valueToJSON.predicate": "tombstone", "transforms.valueToJSON.negate": true, "transforms.wrapValue.type":"org.apache.kafka.connect.transforms.HoistField$Value", "transforms.wrapValue.field":"matrix", "transforms.wrapValue.predicate": "tombstone", "transforms.wrapValue.negate": true, "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addTimestamp.timestamp.field": "message_timestamp", "predicates": "tombstone", "predicates.tombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone" ``` To avoid the cryptic error “java.lang.ClassCastException: class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct” when processing a tombstone record, I had to add a negated predicate of RecordIsTombstone for ToJSON (community SMT) and HoistField, but did not need to add that to InsertField. Digging in the source, I find that InsertField handles the case where key or value is null: https://github.com/a0x8o/kafka/blob/f8237749f6ad34c09154f807e53273be64e1261e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L130 ^ Thanks to this, there's no need to add a predicate to skip InsertField$Value when value is null. It would help if the docs listed how the individual SMTs behave when dealing with a null key/value. Of course we can always find this out by trial and error or by studying the source code. But if we were to make a best practice of describing how an SMT handles null key/value, that would have two benefits: 1) Save developers time when working with the official (shipped with Kafka) SMT 2) Inspire developers who write their own SMT to likewise document how they handle null key/value Perhaps a standard way of dealing with nulls ("no-op if key/value is null") could be promoted, and SMT authors would only need to document their behavior when it differs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13298) Improve documentation on EOS KStream requirements
[ https://issues.apache.org/jira/browse/KAFKA-13298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17419166#comment-17419166 ] Andy Chambers commented on KAFKA-13298: --- I made a PR for this but initially did not use the correct title format which I suppose is why it was never automatically linked. Sorry about that. But here is a link to the PR. https://github.com/apache/kafka/pull/11355 > Improve documentation on EOS KStream requirements > - > > Key: KAFKA-13298 > URL: https://issues.apache.org/jira/browse/KAFKA-13298 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: F Méthot >Priority: Trivial > > After posting question on a kafka forum, the following was revealed by kafka > developer: > {quote}Is there minimum replication factor required to enable exactly_once > for topic involved in transactions? > {quote} > "Well, technically you can configure the system to use EOS with any > replication factor. However, using a lower replication factor than 3 > effectively voids EOS. Thus, it’s strongly recommended to use a replication > factor of 3." > > This should be clearly documented in the stream doc: > [https://kafka.apache.org/28/documentation/streams/developer-guide/config-streams.html] > Which refers to this broker link > [https://kafka.apache.org/28/documentation/streams/developer-guide/config-streams.html#streams-developer-guide-processing-guarantedd] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cddr opened a new pull request #11355: Improve documentation on EOS KStream requirements
cddr opened a new pull request #11355: URL: https://github.com/apache/kafka/pull/11355 Minor documentation fix to address: https://issues.apache.org/jira/browse/KAFKA-13298 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11321: MINOR: Replace EasyMock with Mockito in connect:basic-auth-extension
ijuma merged pull request #11321: URL: https://github.com/apache/kafka/pull/11321 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on pull request #10851: URL: https://github.com/apache/kafka/pull/10851#issuecomment-925659295 Hi @cadonna is it possible to continue pushing this PR forward? I'm back from my holidays. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10544) Convert KTable aggregations to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya resolved KAFKA-10544. -- Resolution: Fixed https://github.com/apache/kafka/pull/11316 > Convert KTable aggregations to new PAPI > --- > > Key: KAFKA-10544 > URL: https://issues.apache.org/jira/browse/KAFKA-10544 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714557835 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Updating log for $topicPartition to assign topic ID " + s"$topicId from LeaderAndIsr request from controller $controllerId with correlation " + s"id $correlationId epoch $controllerEpoch") +if (partitionState.leader != localBrokerId && metadataCache.hasAliveBroker(partitionState.leader)) Review comment: Don't forget to remove `&& metadataCache.hasAliveBroker(partitionState.leader)` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-13302) [IEP-59] Support not default page size
[ https://issues.apache.org/jira/browse/KAFKA-13302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikolay Izhikov closed KAFKA-13302. --- > [IEP-59] Support not default page size > -- > > Key: KAFKA-13302 > URL: https://issues.apache.org/jira/browse/KAFKA-13302 > Project: Kafka > Issue Type: Improvement >Reporter: Nikolay Izhikov >Priority: Major > Labels: IEP-59 > > Currently, CDC doesn't support not default page size. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714549045 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala ## @@ -133,4 +137,75 @@ class AbstractFetcherManagerTest { assertEquals(0, fetcherManager.deadThreadCount) EasyMock.verify(fetcher) } + + @Test + def testMaybeUpdateTopicIds(): Unit = { +val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread]) +val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", 2) { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { +fetcher + } +} + +val fetchOffset = 10L +val leaderEpoch = 15 +val tp1 = new TopicPartition("topic", 0) +val tp2 = new TopicPartition("topic", 1) +val topicId = Some(Uuid.randomUuid()) + +// Start out with no topic ID. +val initialFetchState1 = InitialFetchState( + topicId = None, + leader = new BrokerEndPoint(0, "localhost", 9092), + currentLeaderEpoch = leaderEpoch, + initOffset = fetchOffset) + +// Include a partition on a different leader +val initialFetchState2 = InitialFetchState( + topicId = None, + leader = new BrokerEndPoint(1, "localhost", 9092), + currentLeaderEpoch = leaderEpoch, + initOffset = fetchOffset) + +val partitionsToUpdate = Map(tp1 -> initialFetchState1.leader.id, tp2 -> initialFetchState2.leader.id) +val topicIds = (_: String) => topicId + +// Simulate calls to different fetchers due to different leaders +EasyMock.expect(fetcher.start()) +EasyMock.expect(fetcher.start()) + +EasyMock.expect(fetcher.addPartitions(Map(tp1 -> initialFetchState1))) + .andReturn(Set(tp1)) +EasyMock.expect(fetcher.addPartitions(Map(tp2 -> initialFetchState2))) + .andReturn(Set(tp2)) + +EasyMock.expect(fetcher.fetchState(tp1)) + .andReturn(Some(PartitionFetchState(None, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) +EasyMock.expect(fetcher.fetchState(tp2)) + .andReturn(Some(PartitionFetchState(None, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) + +EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp1), topicIds)) +EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp2), topicIds)) + +EasyMock.expect(fetcher.fetchState(tp1)) + .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) +EasyMock.expect(fetcher.fetchState(tp2)) + .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) +EasyMock.replay(fetcher) + +def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) +} + +fetcherManager.addFetcherForPartitions(Map(tp1 -> initialFetchState1, tp2 -> initialFetchState2)) +verifyFetchState(fetcher.fetchState(tp1), None) +verifyFetchState(fetcher.fetchState(tp2), None) + +fetcherManager.maybeUpdateTopicIds(partitionsToUpdate, topicIds) +verifyFetchState(fetcher.fetchState(tp1), topicId) +verifyFetchState(fetcher.fetchState(tp2), topicId) + +EasyMock.verify(fetcher) Review comment: Right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714547943 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { +partitionMapLock.lockInterruptibly() +try { + partitions.foreach { tp => +val currentState = partitionStates.stateValue(tp) +if (currentState != null) { + val updatedState = currentState.updateTopicId(topicIds(tp.topic)) + partitionStates.updateAndMoveToEnd(tp, updatedState) Review comment: Right. `updateAndMoveToEnd` removes the item first and then adds it back at the end of the `LinkedHashMap`. We do rely on this to rotate the partitions in the `LinkedHashMap` in order to treat all partitions fairly. When we use a session, this is done on the broker as well btw. The new `update` method will only update the state and will overwrite any existing state in the `LinkedHashMap`. This is what `map.put()` does. However, it does not change the order in the `LinkedHashMap` which is good. We don't have to change the oder when we set the topic id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13319) Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty
[ https://issues.apache.org/jira/browse/KAFKA-13319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan reassigned KAFKA-13319: Assignee: Ryan > Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty > --- > > Key: KAFKA-13319 > URL: https://issues.apache.org/jira/browse/KAFKA-13319 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Ryan >Priority: Major > Labels: newbie > > If a user calls `Producer.sendOffsetsToTransaction` with an empty map of > offsets, we can shortcut return and skip the logic to add the offsets topic > to the transaction. The main benefit is avoiding the unnecessary accumulation > of markers in __consumer_offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)