[jira] [Commented] (KAFKA-6020) Broker side filtering

2021-09-23 Thread King Jin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17419571#comment-17419571
 ] 

King Jin commented on KAFKA-6020:
-

How about broker side filtering by Kafka Headers? The consumer sent filtering 
header when request more messages from broker, the broker filtering message by 
header before response message back to consumer.

> Broker side filtering
> -
>
> Key: KAFKA-6020
> URL: https://issues.apache.org/jira/browse/KAFKA-6020
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Pavel Micka
>Priority: Major
>  Labels: needs-kip
>
> Currently, it is not possible to filter messages on broker side. Filtering 
> messages on broker side is convenient for filter with very low selectivity 
> (one message in few thousands). In my case it means to transfer several GB of 
> data to consumer, throw it away, take one message and do it again...
> While I understand that filtering by message body is not feasible (for 
> performance reasons), I propose to filter just by message key prefix. This 
> can be achieved even without any deserialization, as the prefix to be matched 
> can be passed as an array (hence the broker would do just array prefix 
> compare).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ccding commented on pull request #11345: Allow empty last segment to have missing offset index during recovery

2021-09-23 Thread GitBox


ccding commented on pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#issuecomment-926250962


   Failed tests are irrelevant and passed on my local run.
   ```
Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
 4.7 sec 1
Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers
  56 sec  1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

2021-09-23 Thread GitBox


junrao commented on a change in pull request #11327:
URL: https://github.com/apache/kafka/pull/11327#discussion_r715227056



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -512,6 +514,27 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   uncleanablePartitions.get(log.parentDir).exists(partitions => 
partitions.contains(topicPartition))
 }
   }
+
+  def maintainUncleanablePartitions(): Unit = {
+// Remove deleted partitions from uncleanablePartitions
+inLock(lock) {
+  // Note: we don't use retain or filterInPlace method in this function 
because retain is deprecated in
+  // scala 2.13 while filterInPlace is not available in scala 2.12.
+
+  // Remove deleted partitions
+  uncleanablePartitions.values.foreach {
+partitions =>
+  val partitionsToRemove = 
partitions.filterNot(logs.contains(_)).toList
+  partitionsToRemove.foreach { partitions.remove(_) }
+  }
+
+  // Remove entries with empty partition set.
+  val logDirsToRemove = uncleanablePartitions.filter {
+case (logDir, partitions) => partitions.isEmpty

Review comment:
   logDir is unused and can be replaced with _.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13070) LogManager shutdown races with periodic work scheduled by the instance

2021-09-23 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13070.
-
Resolution: Duplicate

> LogManager shutdown races with periodic work scheduled by the instance
> --
>
> Key: KAFKA-13070
> URL: https://issues.apache.org/jira/browse/KAFKA-13070
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kowshik Prakasam
>Assignee: Cong Ding
>Priority: Major
>
> In the LogManager shutdown sequence (in LogManager.shutdown()), we don't 
> cancel the periodic work scheduled by it prior to shutdown. As a result, the 
> periodic work could race with the shutdown sequence causing some unwanted 
> side effects. This is reproducible by a unit test in LogManagerTest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13315) log layer exception during shutdown that caused an unclean shutdown

2021-09-23 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13315.
-
Fix Version/s: 3.0.1
   3.1.0
   Resolution: Fixed

merged the PR to trunk and 3.0.

> log layer exception during shutdown that caused an unclean shutdown
> ---
>
> Key: KAFKA-13315
> URL: https://issues.apache.org/jira/browse/KAFKA-13315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Cong Ding
>Assignee: Cong Ding
>Priority: Major
> Fix For: 3.1.0, 3.0.1
>
>
> We have seen an exception caused by shutting down scheduler before shutting 
> down LogManager.
> When LogManager was closing partitons one by one, scheduler called to delete 
> old segments due to retention. However, the old segments could have been 
> closed by the LogManager, which subsequently marked logdir as offline and 
> didn't write the clean shutdown marker. Ultimately the broker would take 
> hours to restart.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-23 Thread GitBox


junrao commented on pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#issuecomment-926239908


   cherry-picked to 3.0 branch too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage

2021-09-23 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17419498#comment-17419498
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13272:


[~guozhang] any updates here? I'm seeing almost weekly reports of Streams EOS 
apps getting stuck in restoration – a few of them I believe are actually 
https://issues.apache.org/jira/browse/KAFKA-13295, but most of them don't 
report any ProducerFencedException or TaskMigratedException which would rule 
that one out.

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that partition.
>  
> In an attempt to recover, we restarted the stream app with stream 
> processing.guarantee back to at_least_once, than it proceed with reading the 
> changelog and restoring partition 8 fully.
> But we noticed afterward, for the next hour until we rebuilt the system, that 
> partition 8 from command-expiry-store-changelog would not be 
> cleaned/compacted by the log cleaner/compacter compared to other partitions. 
> (could be unrelated, because we have seen that before)
> So we resorted to delete/recreate our command-expiry-store-changelog topic 
> and events topic and regenerate it from the commands, reading from beginning.
> Things went back to normal
> h3. Attempt 2 at reproducing this issue
> kstream runs with *exactly-once*
> We force-deleted all 3 pod running kafka.
>  After that, one of the partition can’t be restored. (like reported in 
> previous attempt)
>  For that partition, we noticed these logs on the broker
> {code:java}
> [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: 
> Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, 
> command-expiry-store-changelog-9) while trying to send transaction markers 
> for commands-processor-0_9, these partitions are likely deleted already and 
> hence can be skipped 
> (kafka.coordinator.transaction.TransactionMarkerChannelManager){code}
> Then
>  - we stop the kstream app,
>  - restarted kafka brokers cleanly
>  - Restarting the Kstream app, 
> Those logs messages showed up on the kstream app log:
>  
> {code:java}
> 2021-08-27 18:34:42,413 INFO [Consumer 
> clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
>  groupId=commands-processor] The following partitions still have unstable 
> offsets which are not cleared on the broker side: [commands-9], this could be 
> either transactional offsets waiting for completion, or normal offsets 
> waiting for replication after appending to local log 
> [commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1] 
> 

[GitHub] [kafka] junrao merged pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-23 Thread GitBox


junrao merged pull request #11351:
URL: https://github.com/apache/kafka/pull/11351


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vincent81jiang commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

2021-09-23 Thread GitBox


vincent81jiang commented on a change in pull request #11327:
URL: https://github.com/apache/kafka/pull/11327#discussion_r715169127



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -512,6 +514,27 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   uncleanablePartitions.get(log.parentDir).exists(partitions => 
partitions.contains(topicPartition))
 }
   }
+
+  def maintainUncleanablePartitions(): Unit = {
+// Remove deleted partitions from uncleanablePartitions
+inLock(lock) {
+  // Remove non-existing logDir
+  // Note: we don't use retain or filterInPlace method in this function 
because retain is deprecated in
+  // scala 2.13 while filterInPlace is not available in scala 2.12.
+  val logDirsToRemove = uncleanablePartitions.filterNot {
+case (logDir, _) => logDirs.map(_.getAbsolutePath).contains(logDir)
+  }.keys.toList
+  logDirsToRemove.foreach { uncleanablePartitions.remove(_) }
+
+  uncleanablePartitions.values.foreach {
+partitions =>
+  // Remove deleted partitions
+  val partitionsToRemove = 
partitions.filterNot(logs.contains(_)).toList
+  partitionsToRemove.foreach { partitions.remove(_) }

Review comment:
   Yes, agree.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vincent81jiang commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

2021-09-23 Thread GitBox


vincent81jiang commented on a change in pull request #11327:
URL: https://github.com/apache/kafka/pull/11327#discussion_r715168688



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -512,6 +514,27 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   uncleanablePartitions.get(log.parentDir).exists(partitions => 
partitions.contains(topicPartition))
 }
   }
+
+  def maintainUncleanablePartitions(): Unit = {
+// Remove deleted partitions from uncleanablePartitions
+inLock(lock) {
+  // Remove non-existing logDir
+  // Note: we don't use retain or filterInPlace method in this function 
because retain is deprecated in
+  // scala 2.13 while filterInPlace is not available in scala 2.12.
+  val logDirsToRemove = uncleanablePartitions.filterNot {
+case (logDir, _) => logDirs.map(_.getAbsolutePath).contains(logDir)
+  }.keys.toList
+  logDirsToRemove.foreach { uncleanablePartitions.remove(_) }

Review comment:
   make sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-23 Thread GitBox


ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r71507



##
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##
@@ -141,9 +145,14 @@ class KafkaScheduler(val threads: Int,
   executor != null
 }
   }
-  
-  private def ensureRunning(): Unit = {
-if (!isStarted)
-  throw new IllegalStateException("Kafka scheduler is not running.")
-  }
+}
+
+private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] {
+  override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+  override def isCancelled: Boolean = true
+  override def isDone: Boolean = true
+  override def get(): Unit = {}
+  override def get(timeout: Long, unit: TimeUnit): Unit = {}
+  override def getDelay(unit: TimeUnit): Long = 0
+  override def compareTo(o: Delayed): Int = 0

Review comment:
   NoOpScheduledFutureTask extends ScheduledFuture, ScheduledFuture extends 
Delayed, Delayed extends Comparable\: therefore we should use Delayed 
here. Also, it doesn't compile if I changed it to ScheduledFuture
   
   Fixed the return value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-23 Thread GitBox


ccding commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r71507



##
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##
@@ -141,9 +145,14 @@ class KafkaScheduler(val threads: Int,
   executor != null
 }
   }
-  
-  private def ensureRunning(): Unit = {
-if (!isStarted)
-  throw new IllegalStateException("Kafka scheduler is not running.")
-  }
+}
+
+private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] {
+  override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+  override def isCancelled: Boolean = true
+  override def isDone: Boolean = true
+  override def get(): Unit = {}
+  override def get(timeout: Long, unit: TimeUnit): Unit = {}
+  override def getDelay(unit: TimeUnit): Long = 0
+  override def compareTo(o: Delayed): Int = 0

Review comment:
   NoOpScheduledFutureTask extends ScheduledFuture, ScheduledFuture extends 
Delayed, Delayed extends Comparable: therefore we should use Delayed 
here. Also, it doesn't compile if I changed it to ScheduledFuture
   
   Fixed the return value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11351: KAFKA-13315: log layer exception during shutdown that caused an unclean shutdown

2021-09-23 Thread GitBox


junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r715068733



##
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##
@@ -141,9 +145,14 @@ class KafkaScheduler(val threads: Int,
   executor != null
 }
   }
-  
-  private def ensureRunning(): Unit = {
-if (!isStarted)
-  throw new IllegalStateException("Kafka scheduler is not running.")
-  }
+}
+
+private class NoOpScheduledFutureTask() extends ScheduledFuture[Unit] {
+  override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+  override def isCancelled: Boolean = true
+  override def isDone: Boolean = true
+  override def get(): Unit = {}
+  override def get(timeout: Long, unit: TimeUnit): Unit = {}
+  override def getDelay(unit: TimeUnit): Long = 0
+  override def compareTo(o: Delayed): Int = 0

Review comment:
   Should Delayed be ScheduledFuture? Also, instead of always returning 0, 
it seems that it's better to return 0 if the other instance is 
NoOpScheduledFutureTask and return -1 or 1 otherwise?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on a change in pull request #11356: [KAFKA-10539] Convert KStreamImpl joins to new PAPI

2021-09-23 Thread GitBox


jeqo commented on a change in pull request #11356:
URL: https://github.com/apache/kafka/pull/11356#discussion_r715015262



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -200,11 +217,11 @@ private void emitNonJoinedOuterRecords(final 
KeyValueStore

[GitHub] [kafka] jeqo opened a new pull request #11356: [KAFKA-10539] Convert KStreamImpl joins to new PAPI

2021-09-23 Thread GitBox


jeqo opened a new pull request #11356:
URL: https://github.com/apache/kafka/pull/11356


   As part of the migration to new Processor API, this PR converts KStream to 
KStream joins.
   
   Depends #11315 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13321) Notify listener of leader change on registration

2021-09-23 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13321:
--

 Summary: Notify listener of leader change on registration
 Key: KAFKA-13321
 URL: https://issues.apache.org/jira/browse/KAFKA-13321
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


When a Listener is registered with the RaftClient, the RaftClient doesn't 
notify the listener of the current leader when it is an follower. The current 
implementation of RaftClient notifies this listener of the leader change if it 
is the current leader and it has caught up to the leader epoch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #11354: MINOR: Print lastTimestamp when dumping producer snapshots

2021-09-23 Thread GitBox


hachikuji merged pull request #11354:
URL: https://github.com/apache/kafka/pull/11354


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11347: KAFKA-13296: warn if previous assignment has duplicate partitions

2021-09-23 Thread GitBox


wcarlson5 commented on a change in pull request #11347:
URL: https://github.com/apache/kafka/pull/11347#discussion_r714990115



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -351,6 +351,17 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
 // add the consumer and any info in its subscription to the client
 clientMetadata.addConsumer(consumerId, 
subscription.ownedPartitions());
+if (allOwnedPartitions.stream().anyMatch(t -> 
subscription.ownedPartitions().contains(t))) {
+log.warn("The previous assignment contains a partition more 
than once. " +
+"This might result in violation of EOS if enabled. \n" +

Review comment:
   that is probably easier to read. I will do that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11347: KAFKA-13296: warn if previous assignment has duplicate partitions

2021-09-23 Thread GitBox


wcarlson5 commented on a change in pull request #11347:
URL: https://github.com/apache/kafka/pull/11347#discussion_r714987990



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -351,6 +351,17 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
 // add the consumer and any info in its subscription to the client
 clientMetadata.addConsumer(consumerId, 
subscription.ownedPartitions());
+if (allOwnedPartitions.stream().anyMatch(t -> 
subscription.ownedPartitions().contains(t))) {

Review comment:
   Sure that works




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.

2021-09-23 Thread GitBox


satishd commented on pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#issuecomment-925983449


   Thanks @ccding for the review, addressed with inline replies.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.

2021-09-23 Thread GitBox


satishd commented on pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#issuecomment-925982858


   Thanks @junrao for the review. Addressed them with the latest commit and 
comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-09-23 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714976848



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+// header: 
+// size: 2 + (8+8) + 4 + 8 = 30
+private static final int HEADER_SIZE = 30;
+
+private final File metadataStoreFile;
+
+/**
+ * Creates a CommittedLogMetadataSnapshotFile instance backed by a file 
with the name `remote_log_snapshot` in
+ * the given {@code metadataStoreDir}. It creates the file if it does not 
exist.
+ *
+ * @param metadataStoreDir directory in which the snapshot file to be 
created.
+ */
+RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+this.metadataStoreFile = new File(metadataStoreDir.toFile(), 
COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+// Create an empty file if it does not exist.
+try {
+boolean newFileCreated = metadataStoreFile.createNewFile();
+log.info("Remote log metadata snapshot file: [{}], newFileCreated: 
[{}]", metadataStoreFile, newFileCreated);
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+}
+
+/**
+ * Writes the given snapshot replacing the earlier snapshot data.
+ *
+ * @param snapshot Snapshot to be stored.
+ * @throws IOException if there4 is any error in writing the given 
snapshot to the file.
+ */
+public synchronized void write(Snapshot snapshot) throws IOException {
+File newMetadataSnapshotFile = new 
File(metadataStoreFile.getAbsolutePath() + ".new");
+try (WritableByteChannel fileChannel = Channels.newChannel(new 
FileOutputStream(newMetadataSnapshotFile))) {
+
+ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+// Write version
+headerBuffer.putShort(snapshot.version);
+
+// Write topic-id
+headerBuffer.putLong(snapshot.topicId.getMostSignificantBits());
+headerBuffer.putLong(snapshot.topicId.getLeastSignificantBits());
+
+// Write metadata partition and metadata partition offset
+headerBuffer.putInt(snapshot.metadataPartition);
+headerBuffer.putLong(snapshot.metadataPartitionOffset);
+headerBuffer.flip();
+
+// Write header
+fileChannel.write(headerBuffer);
+
+// Write each entry
+ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+

[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-09-23 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714976377



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
##
@@ -161,53 +161,46 @@ public void 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metada
 throw new IllegalArgumentException("metadataUpdate: " + 
metadataUpdate + " with state " + RemoteLogSegmentState.COPY_SEGMENT_STARTED +
" can not be updated");
 case COPY_SEGMENT_FINISHED:
-handleSegmentWithCopySegmentFinishedState(metadataUpdate, 
existingMetadata);
+
handleSegmentWithCopySegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
 break;
 case DELETE_SEGMENT_STARTED:
-handleSegmentWithDeleteSegmentStartedState(metadataUpdate, 
existingMetadata);
+
handleSegmentWithDeleteSegmentStartedState(existingMetadata.createWithUpdates(metadataUpdate));
 break;
 case DELETE_SEGMENT_FINISHED:
-handleSegmentWithDeleteSegmentFinishedState(metadataUpdate, 
existingMetadata);
+
handleSegmentWithDeleteSegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
 break;
 default:
 throw new IllegalArgumentException("Metadata with the state " 
+ targetState + " is not supported");
 }
 }
 
-private void 
handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadataUpdate 
metadataUpdate,
-   
RemoteLogSegmentMetadata existingMetadata) {
-log.debug("Adding remote log segment metadata to leader epoch mappings 
with update: [{}]", metadataUpdate);
-
-doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
+protected final void 
handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {

Review comment:
   Good catch. We should maintain the ordering while storing the segments. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-09-23 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714975955



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+// header: 

Review comment:
   This was suggested in KIP review to add this as it will be useful for 
debugging. To make sure that, what is stored in the file and the received topic 
id should be the same.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-09-23 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714975590



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
##
@@ -39,6 +41,7 @@
 private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new 
RemoteLogSegmentMetadataRecord().apiKey();
 private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
new RemoteLogSegmentMetadataUpdateRecord().apiKey();
 private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
+private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = 
new RemoteLogSegmentMetadataSnapshotRecord().apiKey();

Review comment:
   There are two ways to implement it. 
   - Have a separate BytesApiMessageSerde for this message only and write the 
supporting classes which will be similar to RemoteLogMetadataSerde and 
RemoteLogSegmentMetadataSnapshotTransform.
   - This is one more ApiMessage about representing remote log metadata. Add to 
the existing RemoteLogMetadatSerde which has the framework to add one more api 
message. 
   
   I choose the latter for simplicity. We can update the javadoc to describe 
that RemoteLogMetadatSerde includes serde for all the APIMessage defined for 
remote log metadata including RemoteLogSegmentMetadataSnapshot. It can be used 
as serde for the topic as it supports all the messages stored in the remote log 
metadata topic. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-09-23 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714975169



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -120,6 +174,35 @@ public void run() {
 }
 }
 
+private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
+boolean noOffsetUpdates = 
committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets);
+if (noOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, 
forceSync: {}", noOffsetUpdates, forceSync);
+return;
+}
+
+try {
+HashMap syncedPartitionToConsumedOffsets = new 
HashMap<>();
+for (TopicIdPartition topicIdPartition : assignedTopicPartitions) {
+int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
+Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
+if (offset != null && 
!offset.equals(committedPartitionToConsumedOffsets.get(metadataPartition))) {
+
remotePartitionMetadataEventHandler.syncLogMetadataDataFile(topicIdPartition, 
metadataPartition, offset);
+syncedPartitionToConsumedOffsets.put(metadataPartition, 
offset);
+} else {
+log.debug("Skipping syncup of the remote-log-metadata-file 
for partition:{} , with remote log metadata partition{},  and offset:{} ",
+topicIdPartition, metadataPartition, offset);
+}
+}
+
+committedOffsetsFile.writeEntries(partitionToConsumedOffsets);

Review comment:
   We needed partitionToConsumedOffsets in the earlier check and we do not 
really need syncedPartitionToConsumedOffsets here. Updated to use 
partitionToConsumedOffsets for writing and setting 
committedPartitionToConsumedOffsets as partitionToConsumedOffsets.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-09-23 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714974207



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -120,6 +174,35 @@ public void run() {
 }
 }
 
+private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
+boolean noOffsetUpdates = 
committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets);
+if (noOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, 
forceSync: {}", noOffsetUpdates, forceSync);
+return;
+}
+
+try {
+HashMap syncedPartitionToConsumedOffsets = new 
HashMap<>();
+for (TopicIdPartition topicIdPartition : assignedTopicPartitions) {

Review comment:
   Good point. Added the locking and updated it with a comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-09-23 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714973902



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -85,21 +90,68 @@
 // Map of remote log metadata topic partition to consumed offsets.
 private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
 
+private Map committedPartitionToConsumedOffsets = 
Collections.emptyMap();
+
+private final long committedOffsetSyncIntervalMs;
+private CommittedOffsetsFile committedOffsetsFile;
+private long lastSyncedTimeMs;
+
 public ConsumerTask(KafkaConsumer consumer,
 RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
-RemoteLogMetadataTopicPartitioner topicPartitioner) {
-Objects.requireNonNull(consumer);
-Objects.requireNonNull(remotePartitionMetadataEventHandler);
-Objects.requireNonNull(topicPartitioner);
-
-this.consumer = consumer;
-this.remotePartitionMetadataEventHandler = 
remotePartitionMetadataEventHandler;
-this.topicPartitioner = topicPartitioner;
+RemoteLogMetadataTopicPartitioner topicPartitioner,
+Path committedOffsetsPath,
+Time time,
+long committedOffsetSyncIntervalMs) {
+this.consumer = Objects.requireNonNull(consumer);
+this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
+this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+this.time = Objects.requireNonNull(time);
+this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
+
+initializeConsumerAssignment(committedOffsetsPath);
+}
+
+private void initializeConsumerAssignment(Path committedOffsetsPath) {
+try {
+committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+
+Map committedOffsets = Collections.emptyMap();
+try {
+// Load committed offset and assign them in the consumer.
+committedOffsets = committedOffsetsFile.readEntries();
+} catch (IOException e) {
+// Ignore the error and consumer consumes from the earliest offset.
+log.error("Encountered error while building committed offsets from 
the file", e);
+}
+
+final Set> entries = 
committedOffsets.entrySet();
+
+if (!entries.isEmpty()) {
+// Assign topic partitions from the earlier committed offsets file.
+Set earlierAssignedPartitions = committedOffsets.keySet();
+assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
+Set metadataTopicPartitions = 
earlierAssignedPartitions.stream()
+   
.map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
+   
.collect(Collectors.toSet());
+consumer.assign(metadataTopicPartitions);
+
+// Seek to the committed offsets
+for (Map.Entry entry : entries) {
+partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
+consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
+}
+
+committedPartitionToConsumedOffsets = committedOffsets;
+}
 }
 
 @Override
 public void run() {
 log.info("Started Consumer task thread.");
+lastSyncedTimeMs = time.milliseconds();

Review comment:
   I guess the consumer would fetch from the earliest offset as there is no 
state associated with in the consumer for those partitions as 
`auto.offset.reset` is set as `earliest`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-09-23 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714972637



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -85,21 +90,68 @@
 // Map of remote log metadata topic partition to consumed offsets.
 private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
 
+private Map committedPartitionToConsumedOffsets = 
Collections.emptyMap();
+
+private final long committedOffsetSyncIntervalMs;
+private CommittedOffsetsFile committedOffsetsFile;
+private long lastSyncedTimeMs;
+
 public ConsumerTask(KafkaConsumer consumer,
 RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
-RemoteLogMetadataTopicPartitioner topicPartitioner) {
-Objects.requireNonNull(consumer);
-Objects.requireNonNull(remotePartitionMetadataEventHandler);
-Objects.requireNonNull(topicPartitioner);
-
-this.consumer = consumer;
-this.remotePartitionMetadataEventHandler = 
remotePartitionMetadataEventHandler;
-this.topicPartitioner = topicPartitioner;
+RemoteLogMetadataTopicPartitioner topicPartitioner,
+Path committedOffsetsPath,
+Time time,
+long committedOffsetSyncIntervalMs) {
+this.consumer = Objects.requireNonNull(consumer);
+this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
+this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+this.time = Objects.requireNonNull(time);
+this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
+
+initializeConsumerAssignment(committedOffsetsPath);
+}
+
+private void initializeConsumerAssignment(Path committedOffsetsPath) {
+try {
+committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+
+Map committedOffsets = Collections.emptyMap();
+try {
+// Load committed offset and assign them in the consumer.
+committedOffsets = committedOffsetsFile.readEntries();
+} catch (IOException e) {
+// Ignore the error and consumer consumes from the earliest offset.
+log.error("Encountered error while building committed offsets from 
the file", e);
+}
+
+final Set> entries = 
committedOffsets.entrySet();

Review comment:
   We want to go through the entries instead of calling get(key) with all 
the keys avoiding the lookups. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-09-23 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714971600



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
##
@@ -49,21 +52,23 @@
 
 public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
-   RemoteLogMetadataTopicPartitioner 
rlmmTopicPartitioner,
+   RemoteLogMetadataTopicPartitioner topicPartitioner,
Time time) {
 this.rlmmConfig = rlmmConfig;
 this.time = time;
 
 //Create a task to consume messages and submit the respective events 
to RemotePartitionMetadataEventHandler.
 KafkaConsumer consumer = new 
KafkaConsumer<>(rlmmConfig.consumerProperties());
-consumerTask = new ConsumerTask(consumer, 
remotePartitionMetadataEventHandler, rlmmTopicPartitioner);
+Path committedOffsetsPath = new File(rlmmConfig.logDir(), 
COMMITTED_OFFSETS_FILE_NAME).toPath();
+consumerTask = new ConsumerTask(consumer, 
remotePartitionMetadataEventHandler, topicPartitioner, committedOffsetsPath, 
time, 60_000L);

Review comment:
   Right, this will be added in a followup PR. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-09-23 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714971397



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * This class represents a file containing the committed offsets of remote log 
metadata partitions.
+ */
+public class CommittedOffsetsFile {
+private static final int CURRENT_VERSION = 0;
+private static final String SEPARATOR = " ";
+
+private static final Pattern MINIMUM_ONE_WHITESPACE = 
Pattern.compile("\\s+");
+private final CheckpointFile> checkpointFile;
+
+CommittedOffsetsFile(File offsetsFile) throws IOException {
+CheckpointFile.EntryFormatter> formatter = 
new EntryFormatter();
+checkpointFile = new CheckpointFile<>(offsetsFile, CURRENT_VERSION, 
formatter);
+}
+
+private static class EntryFormatter implements 
CheckpointFile.EntryFormatter> {
+
+@Override
+public String toString(Map.Entry entry) {
+// Each entry is stored in a new line as 
+return entry.getKey() + SEPARATOR + entry.getValue();
+}
+
+@Override
+public Optional> fromString(String line) {
+String[] strings = MINIMUM_ONE_WHITESPACE.split(line);
+if (strings.length != 2) {
+return Optional.empty();
+}
+int partition = Integer.parseInt(strings[0]);
+long offset = Long.parseLong(strings[1]);

Review comment:
   An error will be thrown to the caller. As you suggested, it is good to 
catch that and return empty optional so that the caller throws an error with 
the details including the line. Addressed it in the latest commit. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714947277



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -929,6 +929,29 @@ class AbstractFetcherThreadTest {
 fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+val partition = new TopicPartition("topic1", 0)
+val fetcher = new MockFetcherThread
+
+// Start with no topic IDs
+fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, 
leaderEpoch = 0)))
+
+
+def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+  assertTrue(fetchState.isDefined)
+  assertEquals(expectedTopicId, fetchState.get.topicId)
+}
+
+verifyFetchState(fetcher.fetchState(partition), None)
+
+// Add topic ID
+fetcher.maybeUpdateTopicIds(Set(partition), topicName => 
topicIds.get(topicName))

Review comment:
   Ok. Seems like we agree then.  I think I have a test in ReplicaManager 
that does this as well. So we are good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714936472



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -929,6 +929,29 @@ class AbstractFetcherThreadTest {
 fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+val partition = new TopicPartition("topic1", 0)
+val fetcher = new MockFetcherThread
+
+// Start with no topic IDs
+fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, 
leaderEpoch = 0)))
+
+
+def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+  assertTrue(fetchState.isDefined)
+  assertEquals(expectedTopicId, fetchState.get.topicId)
+}
+
+verifyFetchState(fetcher.fetchState(partition), None)
+
+// Add topic ID
+fetcher.maybeUpdateTopicIds(Set(partition), topicName => 
topicIds.get(topicName))

Review comment:
   We could add tests for the ReplicaManager which verifies that the topic 
id is updated and propagated to the fetcher manager. Is it what you are 
thinking about?
   
   Regarding this particular test, what would you add? If the topic id is zero, 
we would just set it. As you said, the logic which prevents this from happening 
is before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714923995



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -3469,4 +3472,88 @@ class ReplicaManagerTest {
   ClientQuotasImage.EMPTY
 )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T],
+  tp: TopicPartition,
+  expectedTopicId: 
Option[Uuid]): Unit = {
+val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+assertTrue(fetchState.isDefined)
+assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+val aliveBrokersIds = Seq(0, 1)
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+  brokerId = 0, aliveBrokersIds)
+try {
+  val tp = new TopicPartition(topic, 0)
+  val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
+
+  val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+  val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+  assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
+
+  assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+  val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+  val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+  assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+  assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = {
+val version = if (usesTopicIds) 
LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort
+val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID

Review comment:
   Ah.. It is because `topicId` is also used in the statement. You could 
indeed use `this.topicId` there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714910220



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -929,6 +929,29 @@ class AbstractFetcherThreadTest {
 fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+val partition = new TopicPartition("topic1", 0)
+val fetcher = new MockFetcherThread
+
+// Start with no topic IDs
+fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, 
leaderEpoch = 0)))
+
+
+def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+  assertTrue(fetchState.isDefined)
+  assertEquals(expectedTopicId, fetchState.get.topicId)
+}
+
+verifyFetchState(fetcher.fetchState(partition), None)
+
+// Add topic ID
+fetcher.maybeUpdateTopicIds(Set(partition), topicName => 
topicIds.get(topicName))

Review comment:
   To clarify, this path is only taken if 
   ```
   requestTopicId match {
 case Some(topicId) if logTopicId.isEmpty =>
   ```
   meaning both the request topic ID and log topic ID are non-empty/non-zero 
UUID.
   
   So just to clarify, should I still check these edge cases (either log ID is 
already defined/ID in the map is zero)  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714901210



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -3469,4 +3472,88 @@ class ReplicaManagerTest {
   ClientQuotasImage.EMPTY
 )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T],
+  tp: TopicPartition,
+  expectedTopicId: 
Option[Uuid]): Unit = {
+val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+assertTrue(fetchState.isDefined)
+assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+val aliveBrokersIds = Seq(0, 1)
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+  brokerId = 0, aliveBrokersIds)
+try {
+  val tp = new TopicPartition(topic, 0)
+  val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
+
+  val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+  val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+  assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
+
+  assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+  val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+  val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+  assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+  assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = {
+val version = if (usesTopicIds) 
LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort
+val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID

Review comment:
   I had this and scala told me it was a recursive definition. Maybe I can 
use something like this.topicId in the definition to clarify the different 
topic ID usages.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714899137



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+partitionMapLock.lockInterruptibly()
+try {
+  partitions.foreach { tp =>
+val currentState = partitionStates.stateValue(tp)
+if (currentState != null) {
+  val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+  partitionStates.updateAndMoveToEnd(tp, updatedState)

Review comment:
   I will do this. For some reason I thought put was like the queue/list 
data structure and not map. This makes sense to me now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13320) Suggestion: SMT support for null key/value should be documented

2021-09-23 Thread Ben Ellis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Ellis updated KAFKA-13320:
--
Description: 
While working with a JDBC Sink Connector, I noticed that some SMT choke on a 
tombstone (null value) while others handle tombstones fine.

For example:
{code:javascript}
"transforms": "flattenKey,valueToJSON,wrapValue,addTimestamp", 

"transforms.flattenKey.type": 
"org.apache.kafka.connect.transforms.Flatten$Key", 
"transforms.flattenKey.delimiter": "_", 

"transforms.valueToJSON.type": 
"com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", 
"transforms.valueToJSON.schemas.enable": "false",
"transforms.valueToJSON.predicate": "tombstone",
"transforms.valueToJSON.negate": true, 

"transforms.wrapValue.type":"org.apache.kafka.connect.transforms.HoistField$Value",
 "transforms.wrapValue.field":"matrix",
"transforms.wrapValue.predicate": "tombstone",
"transforms.wrapValue.negate": true,

"transforms.addTimestamp.type": 
"org.apache.kafka.connect.transforms.InsertField$Value", 
"transforms.addTimestamp.timestamp.field": "message_timestamp",

"predicates": "tombstone",
"predicates.tombstone.type": 
"org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
{code}
To avoid the cryptic error “java.lang.ClassCastException: class 
java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct” 
when processing a tombstone record, I had to add a negated predicate of 
RecordIsTombstone for ToJSON (community SMT) and HoistField, but did not need 
to add that to InsertField.

Digging in the source, I find that InsertField handles the case where key or 
value is null:
 
[https://github.com/a0x8o/kafka/blob/f8237749f6ad34c09154f807e53273be64e1261e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L130]

^ Thanks to this, there's no need to add a predicate to skip InsertField$Value 
when value is null.

It would help if the docs listed how the individual SMTs behave when dealing 
with a null key/value.

Of course we can always find this out by trial and error or by studying the 
source code.
 But if we were to make a best practice of describing how an SMT handles null 
key/value, that would have two benefits:
 1) Save developers time when working with the official (shipped with Kafka) SMT
 2) Inspire developers who write their own SMT to likewise document how they 
handle null key/value

Perhaps a standard way of dealing with nulls ("no-op if key/value is null") 
could be promoted, and SMT authors would only need to document their behavior 
when it differs.

  was:
While working with a JDBC Sink Connector, I noticed that some SMT choke on a 
tombstone (null value) while others handle tombstones fine.

For example:

{code:javascript}
"transforms": "flattenKey,valueToJSON,wrapValue,addTimestamp", 
"transforms.flattenKey.type": 
"org.apache.kafka.connect.transforms.Flatten$Key", 
"transforms.flattenKey.delimiter": "_", "transforms.valueToJSON.type": 
"com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", 
"transforms.valueToJSON.schemas.enable": "false", 
"transforms.valueToJSON.predicate": "tombstone", 
"transforms.valueToJSON.negate": true, 
"transforms.wrapValue.type":"org.apache.kafka.connect.transforms.HoistField$Value",
 "transforms.wrapValue.field":"matrix", "transforms.wrapValue.predicate": 
"tombstone", "transforms.wrapValue.negate": true, 
"transforms.addTimestamp.type": 
"org.apache.kafka.connect.transforms.InsertField$Value", 
"transforms.addTimestamp.timestamp.field": "message_timestamp", "predicates": 
"tombstone", "predicates.tombstone.type": 
"org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
{code}


To avoid the cryptic error “java.lang.ClassCastException: class 
java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct” 
when processing a tombstone record, I had to add a negated predicate of 
RecordIsTombstone for ToJSON (community SMT) and HoistField, but did not need 
to add that to InsertField.

Digging in the source, I find that InsertField handles the case where key or 
value is null:
https://github.com/a0x8o/kafka/blob/f8237749f6ad34c09154f807e53273be64e1261e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L130

^ Thanks to this, there's no need to add a predicate to skip InsertField$Value 
when value is null.

It would help if the docs listed how the individual SMTs behave when dealing 
with a null key/value.

Of course we can always find this out by trial and error or by studying the 
source code.
But if we were to make a best practice of describing how an SMT handles null 
key/value, that would have two benefits:
1) Save developers time when working with the official (shipped with Kafka) SMT
2) Inspire developers who write their own SMT to likewise document how they 
handle null key/value

Perhaps a standard way of dealing 

[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714815080



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -862,6 +877,10 @@ case class PartitionFetchState(fetchOffset: Long,
   s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +

Review comment:
   Could we add the topic id to the `toString`?

##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -3469,4 +3472,88 @@ class ReplicaManagerTest {
   ClientQuotasImage.EMPTY
 )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T],
+  tp: TopicPartition,
+  expectedTopicId: 
Option[Uuid]): Unit = {
+val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+assertTrue(fetchState.isDefined)
+assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+val aliveBrokersIds = Seq(0, 1)
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+  brokerId = 0, aliveBrokersIds)
+try {
+  val tp = new TopicPartition(topic, 0)
+  val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
+
+  val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+  val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+  assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
+
+  assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+  val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+  val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+  assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+  assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = {
+val version = if (usesTopicIds) 
LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort
+val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID

Review comment:
   nit: `topicId` instead of `topicIdRaw`?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+partitionMapLock.lockInterruptibly()
+try {
+  partitions.foreach { tp =>
+val currentState = partitionStates.stateValue(tp)
+if (currentState != null) {
+  val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+  partitionStates.updateAndMoveToEnd(tp, updatedState)

Review comment:
   We need to address this point.

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig,
 stateChangeLogger.info(s"Updating log for $topicPartition 
to assign topic ID " +
   s"$topicId from LeaderAndIsr request from controller 
$controllerId with correlation " +
   s"id $correlationId epoch $controllerEpoch")
+if (partitionState.leader != localBrokerId && 
metadataCache.hasAliveBroker(partitionState.leader))

Review comment:
   We need to address this point.

##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -929,6 +929,29 @@ class AbstractFetcherThreadTest {
 fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+val partition = new TopicPartition("topic1", 0)
+val fetcher = new MockFetcherThread
+
+// Start with no topic IDs
+fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, 
leaderEpoch = 0)))
+
+
+def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+  assertTrue(fetchState.isDefined)
+  assertEquals(expectedTopicId, fetchState.get.topicId)
+}
+
+verifyFetchState(fetcher.fetchState(partition), None)
+
+// Add topic ID
+fetcher.maybeUpdateTopicIds(Set(partition), topicName => 
topicIds.get(topicName))

Review comment:
   Right. We should get here if the partition has 

[jira] [Updated] (KAFKA-13320) Suggestion: SMT support for null key/value should be documented

2021-09-23 Thread Ben Ellis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Ellis updated KAFKA-13320:
--
Description: 
While working with a JDBC Sink Connector, I noticed that some SMT choke on a 
tombstone (null value) while others handle tombstones fine.

For example:

{code:javascript}
"transforms": "flattenKey,valueToJSON,wrapValue,addTimestamp", 
"transforms.flattenKey.type": 
"org.apache.kafka.connect.transforms.Flatten$Key", 
"transforms.flattenKey.delimiter": "_", "transforms.valueToJSON.type": 
"com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", 
"transforms.valueToJSON.schemas.enable": "false", 
"transforms.valueToJSON.predicate": "tombstone", 
"transforms.valueToJSON.negate": true, 
"transforms.wrapValue.type":"org.apache.kafka.connect.transforms.HoistField$Value",
 "transforms.wrapValue.field":"matrix", "transforms.wrapValue.predicate": 
"tombstone", "transforms.wrapValue.negate": true, 
"transforms.addTimestamp.type": 
"org.apache.kafka.connect.transforms.InsertField$Value", 
"transforms.addTimestamp.timestamp.field": "message_timestamp", "predicates": 
"tombstone", "predicates.tombstone.type": 
"org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
{code}


To avoid the cryptic error “java.lang.ClassCastException: class 
java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct” 
when processing a tombstone record, I had to add a negated predicate of 
RecordIsTombstone for ToJSON (community SMT) and HoistField, but did not need 
to add that to InsertField.

Digging in the source, I find that InsertField handles the case where key or 
value is null:
https://github.com/a0x8o/kafka/blob/f8237749f6ad34c09154f807e53273be64e1261e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L130

^ Thanks to this, there's no need to add a predicate to skip InsertField$Value 
when value is null.

It would help if the docs listed how the individual SMTs behave when dealing 
with a null key/value.

Of course we can always find this out by trial and error or by studying the 
source code.
But if we were to make a best practice of describing how an SMT handles null 
key/value, that would have two benefits:
1) Save developers time when working with the official (shipped with Kafka) SMT
2) Inspire developers who write their own SMT to likewise document how they 
handle null key/value

Perhaps a standard way of dealing with nulls ("no-op if key/value is null") 
could be promoted, and SMT authors would only need to document their behavior 
when it differs.
​

  was:
While working with a JDBC Sink Connector, I noticed that some SMT choke on a 
tombstone (null value) while others handle tombstones fine.

For example:

```
"transforms": "flattenKey,valueToJSON,wrapValue,addTimestamp", 
"transforms.flattenKey.type": 
"org.apache.kafka.connect.transforms.Flatten$Key", 
"transforms.flattenKey.delimiter": "_", "transforms.valueToJSON.type": 
"com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", 
"transforms.valueToJSON.schemas.enable": "false", 
"transforms.valueToJSON.predicate": "tombstone", 
"transforms.valueToJSON.negate": true, 
"transforms.wrapValue.type":"org.apache.kafka.connect.transforms.HoistField$Value",
 "transforms.wrapValue.field":"matrix", "transforms.wrapValue.predicate": 
"tombstone", "transforms.wrapValue.negate": true, 
"transforms.addTimestamp.type": 
"org.apache.kafka.connect.transforms.InsertField$Value", 
"transforms.addTimestamp.timestamp.field": "message_timestamp", "predicates": 
"tombstone", "predicates.tombstone.type": 
"org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"

```

To avoid the cryptic error “java.lang.ClassCastException: class 
java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct” 
when processing a tombstone record, I had to add a negated predicate of 
RecordIsTombstone for ToJSON (community SMT) and HoistField, but did not need 
to add that to InsertField.

Digging in the source, I find that InsertField handles the case where key or 
value is null:
https://github.com/a0x8o/kafka/blob/f8237749f6ad34c09154f807e53273be64e1261e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L130

^ Thanks to this, there's no need to add a predicate to skip InsertField$Value 
when value is null.

It would help if the docs listed how the individual SMTs behave when dealing 
with a null key/value.

Of course we can always find this out by trial and error or by studying the 
source code.
But if we were to make a best practice of describing how an SMT handles null 
key/value, that would have two benefits:
1) Save developers time when working with the official (shipped with Kafka) SMT
2) Inspire developers who write their own SMT to likewise document how they 
handle null key/value

Perhaps a standard way of dealing with nulls ("no-op 

[jira] [Created] (KAFKA-13320) Suggestion: SMT support for null key/value should be documented

2021-09-23 Thread Ben Ellis (Jira)
Ben Ellis created KAFKA-13320:
-

 Summary: Suggestion: SMT support for null key/value should be 
documented
 Key: KAFKA-13320
 URL: https://issues.apache.org/jira/browse/KAFKA-13320
 Project: Kafka
  Issue Type: Wish
  Components: KafkaConnect
Reporter: Ben Ellis


While working with a JDBC Sink Connector, I noticed that some SMT choke on a 
tombstone (null value) while others handle tombstones fine.

For example:

```
"transforms": "flattenKey,valueToJSON,wrapValue,addTimestamp", 
"transforms.flattenKey.type": 
"org.apache.kafka.connect.transforms.Flatten$Key", 
"transforms.flattenKey.delimiter": "_", "transforms.valueToJSON.type": 
"com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value", 
"transforms.valueToJSON.schemas.enable": "false", 
"transforms.valueToJSON.predicate": "tombstone", 
"transforms.valueToJSON.negate": true, 
"transforms.wrapValue.type":"org.apache.kafka.connect.transforms.HoistField$Value",
 "transforms.wrapValue.field":"matrix", "transforms.wrapValue.predicate": 
"tombstone", "transforms.wrapValue.negate": true, 
"transforms.addTimestamp.type": 
"org.apache.kafka.connect.transforms.InsertField$Value", 
"transforms.addTimestamp.timestamp.field": "message_timestamp", "predicates": 
"tombstone", "predicates.tombstone.type": 
"org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"

```

To avoid the cryptic error “java.lang.ClassCastException: class 
java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct” 
when processing a tombstone record, I had to add a negated predicate of 
RecordIsTombstone for ToJSON (community SMT) and HoistField, but did not need 
to add that to InsertField.

Digging in the source, I find that InsertField handles the case where key or 
value is null:
https://github.com/a0x8o/kafka/blob/f8237749f6ad34c09154f807e53273be64e1261e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L130

^ Thanks to this, there's no need to add a predicate to skip InsertField$Value 
when value is null.

It would help if the docs listed how the individual SMTs behave when dealing 
with a null key/value.

Of course we can always find this out by trial and error or by studying the 
source code.
But if we were to make a best practice of describing how an SMT handles null 
key/value, that would have two benefits:
1) Save developers time when working with the official (shipped with Kafka) SMT
2) Inspire developers who write their own SMT to likewise document how they 
handle null key/value

Perhaps a standard way of dealing with nulls ("no-op if key/value is null") 
could be promoted, and SMT authors would only need to document their behavior 
when it differs.
​



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13298) Improve documentation on EOS KStream requirements

2021-09-23 Thread Andy Chambers (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17419166#comment-17419166
 ] 

Andy Chambers commented on KAFKA-13298:
---

I made a PR for this but initially did not use the correct title format which I 
suppose is why it was never automatically linked. Sorry about that. But here is 
a link to the PR.

 

https://github.com/apache/kafka/pull/11355

> Improve documentation on EOS KStream requirements
> -
>
> Key: KAFKA-13298
> URL: https://issues.apache.org/jira/browse/KAFKA-13298
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: F Méthot
>Priority: Trivial
>
> After posting question on a kafka forum, the following was revealed by kafka 
> developer:
> {quote}Is there minimum replication factor required to enable exactly_once 
> for topic involved in transactions?
> {quote}
> "Well, technically you can configure the system to use EOS with any 
> replication factor. However, using a lower replication factor than 3 
> effectively voids EOS. Thus, it’s strongly recommended to use a replication 
> factor of 3."
>  
> This should be clearly documented in the stream doc:
> [https://kafka.apache.org/28/documentation/streams/developer-guide/config-streams.html]
> Which refers to this broker link
> [https://kafka.apache.org/28/documentation/streams/developer-guide/config-streams.html#streams-developer-guide-processing-guarantedd]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cddr opened a new pull request #11355: Improve documentation on EOS KStream requirements

2021-09-23 Thread GitBox


cddr opened a new pull request #11355:
URL: https://github.com/apache/kafka/pull/11355


   Minor documentation fix to address: 
https://issues.apache.org/jira/browse/KAFKA-13298
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11321: MINOR: Replace EasyMock with Mockito in connect:basic-auth-extension

2021-09-23 Thread GitBox


ijuma merged pull request #11321:
URL: https://github.com/apache/kafka/pull/11321


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-09-23 Thread GitBox


lkokhreidze commented on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-925659295


   Hi @cadonna is it possible to continue pushing this PR forward? I'm back 
from my holidays.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10544) Convert KTable aggregations to new PAPI

2021-09-23 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya resolved KAFKA-10544.
--
Resolution: Fixed

https://github.com/apache/kafka/pull/11316

> Convert KTable aggregations to new PAPI
> ---
>
> Key: KAFKA-10544
> URL: https://issues.apache.org/jira/browse/KAFKA-10544
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714557835



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig,
 stateChangeLogger.info(s"Updating log for $topicPartition 
to assign topic ID " +
   s"$topicId from LeaderAndIsr request from controller 
$controllerId with correlation " +
   s"id $correlationId epoch $controllerEpoch")
+if (partitionState.leader != localBrokerId && 
metadataCache.hasAliveBroker(partitionState.leader))

Review comment:
   Don't forget to remove `&& 
metadataCache.hasAliveBroker(partitionState.leader)` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (KAFKA-13302) [IEP-59] Support not default page size

2021-09-23 Thread Nikolay Izhikov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov closed KAFKA-13302.
---

> [IEP-59] Support not default page size
> --
>
> Key: KAFKA-13302
> URL: https://issues.apache.org/jira/browse/KAFKA-13302
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Nikolay Izhikov
>Priority: Major
>  Labels: IEP-59
>
> Currently, CDC doesn't support not default page size.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714549045



##
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##
@@ -133,4 +137,75 @@ class AbstractFetcherManagerTest {
 assertEquals(0, fetcherManager.deadThreadCount)
 EasyMock.verify(fetcher)
   }
+
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+val fetcher: AbstractFetcherThread = 
EasyMock.mock(classOf[AbstractFetcherThread])
+val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 2) {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+fetcher
+  }
+}
+
+val fetchOffset = 10L
+val leaderEpoch = 15
+val tp1 = new TopicPartition("topic", 0)
+val tp2 = new TopicPartition("topic", 1)
+val topicId = Some(Uuid.randomUuid())
+
+// Start out with no topic ID.
+val initialFetchState1 = InitialFetchState(
+  topicId = None,
+  leader = new BrokerEndPoint(0, "localhost", 9092),
+  currentLeaderEpoch = leaderEpoch,
+  initOffset = fetchOffset)
+
+// Include a partition on a different leader
+val initialFetchState2 = InitialFetchState(
+  topicId = None,
+  leader = new BrokerEndPoint(1, "localhost", 9092),
+  currentLeaderEpoch = leaderEpoch,
+  initOffset = fetchOffset)
+
+val partitionsToUpdate = Map(tp1 -> initialFetchState1.leader.id, tp2 -> 
initialFetchState2.leader.id)
+val topicIds = (_: String) => topicId
+
+// Simulate calls to different fetchers due to different leaders
+EasyMock.expect(fetcher.start())
+EasyMock.expect(fetcher.start())
+
+EasyMock.expect(fetcher.addPartitions(Map(tp1 -> initialFetchState1)))
+  .andReturn(Set(tp1))
+EasyMock.expect(fetcher.addPartitions(Map(tp2 -> initialFetchState2)))
+  .andReturn(Set(tp2))
+
+EasyMock.expect(fetcher.fetchState(tp1))
+  .andReturn(Some(PartitionFetchState(None, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+EasyMock.expect(fetcher.fetchState(tp2))
+  .andReturn(Some(PartitionFetchState(None, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+
+EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp1), topicIds))
+EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp2), topicIds))
+
+EasyMock.expect(fetcher.fetchState(tp1))
+  .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+EasyMock.expect(fetcher.fetchState(tp2))
+  .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+EasyMock.replay(fetcher)
+
+def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+  assertTrue(fetchState.isDefined)
+  assertEquals(expectedTopicId, fetchState.get.topicId)
+}
+
+fetcherManager.addFetcherForPartitions(Map(tp1 -> initialFetchState1, tp2 
-> initialFetchState2))
+verifyFetchState(fetcher.fetchState(tp1), None)
+verifyFetchState(fetcher.fetchState(tp2), None)
+
+fetcherManager.maybeUpdateTopicIds(partitionsToUpdate, topicIds)
+verifyFetchState(fetcher.fetchState(tp1), topicId)
+verifyFetchState(fetcher.fetchState(tp2), topicId)
+
+EasyMock.verify(fetcher)

Review comment:
   Right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714547943



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+partitionMapLock.lockInterruptibly()
+try {
+  partitions.foreach { tp =>
+val currentState = partitionStates.stateValue(tp)
+if (currentState != null) {
+  val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+  partitionStates.updateAndMoveToEnd(tp, updatedState)

Review comment:
   Right. `updateAndMoveToEnd` removes the item first and then adds it back 
at the end of the `LinkedHashMap`. We do rely on this to rotate the partitions 
in the `LinkedHashMap` in order to treat all partitions fairly. When we use a 
session, this is done on the broker as well btw.
   
   The new `update` method will only update the state and will overwrite any 
existing state in the `LinkedHashMap`. This is what `map.put()` does. However, 
it does not change the order in the `LinkedHashMap` which is good. We don't 
have to change the oder when we set the topic id.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13319) Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty

2021-09-23 Thread Ryan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan reassigned KAFKA-13319:


Assignee: Ryan

> Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty
> ---
>
> Key: KAFKA-13319
> URL: https://issues.apache.org/jira/browse/KAFKA-13319
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Ryan
>Priority: Major
>  Labels: newbie
>
> If a user calls `Producer.sendOffsetsToTransaction` with an empty map of 
> offsets, we can shortcut return and skip the logic to add the offsets topic 
> to the transaction. The main benefit is avoiding the unnecessary accumulation 
> of markers in __consumer_offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)