[GitHub] [kafka] showuon commented on pull request #11613: MINOR: Update streamResetter option description
showuon commented on pull request #11613: URL: https://github.com/apache/kafka/pull/11613#issuecomment-997165583 @jeqo @mjsax , please help take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11613: MINOR: Update streamResetter option description
showuon opened a new pull request #11613: URL: https://github.com/apache/kafka/pull/11613 In [KIP-171](https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application), We added support to allow users to specify to reset offsets to a specific position, not only to the earliest. But the tool description doesn't reflect this change. Update it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon closed pull request #11612: MINOR: Update stream resetter option description
showuon closed pull request #11612: URL: https://github.com/apache/kafka/pull/11612 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11612: MINOR: Update stream resetter option description
showuon opened a new pull request #11612: URL: https://github.com/apache/kafka/pull/11612 In [KIP-171](https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application), We added support to allow users to specify to reset offsets to a specific position, not only to the earliest. But the tool description doesn't reflect this change. Update it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12399) Deprecate Log4J Appender
[ https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjin Lee updated KAFKA-12399: Description: As a following job of KAFKA-9366, we have to entirely remove the log4j 1.2.7 dependency from the classpath by removing dependencies on log4j-appender. (was: As a following job of KAFKA-9366, we have to provide a log4j2 counterpart to log4j-appender.) > Deprecate Log4J Appender > > > Key: KAFKA-12399 > URL: https://issues.apache.org/jira/browse/KAFKA-12399 > Project: Kafka > Issue Type: Improvement > Components: logging >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > Labels: needs-kip > > As a following job of KAFKA-9366, we have to entirely remove the log4j 1.2.7 > dependency from the classpath by removing dependencies on log4j-appender. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-12399) Deprecate Log4J Appender
[ https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjin Lee updated KAFKA-12399: Summary: Deprecate Log4J Appender (was: Add log4j2 Appender) > Deprecate Log4J Appender > > > Key: KAFKA-12399 > URL: https://issues.apache.org/jira/browse/KAFKA-12399 > Project: Kafka > Issue Type: Improvement > Components: logging >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > Labels: needs-kip > > As a following job of KAFKA-9366, we have to provide a log4j2 counterpart to > log4j-appender. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon commented on pull request #11596: MINOR: bump version in kraft readme
showuon commented on pull request #11596: URL: https://github.com/apache/kafka/pull/11596#issuecomment-997159268 @cmccabe , thanks for your comment. I've updated the PR. Please check again. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13549) Add "delete interval" config
[ https://issues.apache.org/jira/browse/KAFKA-13549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461738#comment-17461738 ] Matthias J. Sax commented on KAFKA-13549: - You could also just do the KIP. Should not be too controversial IMHO. – We just need one, because adding a new config is a public API change. > Add "delete interval" config > > > Key: KAFKA-13549 > URL: https://issues.apache.org/jira/browse/KAFKA-13549 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > Kafka Streams uses "delete record" requests to aggressively purge data from > repartition topics. Those request are sent each time we commit. > For at-least-once with a default commit interval of 30 seconds, this works > fine. However, for exactly-once with a default commit interval of 100ms, it's > very aggressive. The main issue is broker side, because the broker logs every > "delete record" request, and thus broker logs are spammed if EOS is enabled. > We should consider to add a new config (eg `delete.record.interval.ms` or > similar) to have a dedicated config for "delete record" requests, to decouple > it from the commit interval config and allow to purge data less aggressively, > even if the commit interval is small to avoid the broker side log spamming. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mjsax commented on pull request #11610: KAFKA-13549: Add delete.interval.ms to Streams
mjsax commented on pull request #11610: URL: https://github.com/apache/kafka/pull/11610#issuecomment-997113605 Thanks for the PR. Can you do a KIP for this? Happy to review the PR after the KIP is approved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13544) Deadlock during shutting down kafka broker because of connectivity problem with zookeeper
[ https://issues.apache.org/jira/browse/KAFKA-13544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-13544. - Fix Version/s: 3.2.0 Resolution: Fixed Merged the PR to trunk. > Deadlock during shutting down kafka broker because of connectivity problem > with zookeeper > -- > > Key: KAFKA-13544 > URL: https://issues.apache.org/jira/browse/KAFKA-13544 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.1 >Reporter: Andrei Lakhmanets >Priority: Major > Fix For: 3.2.0 > > Attachments: kafka_broker_logs.log, kafka_broker_stackdump.txt > > > Hi team, > *Kafka version:* 2.8.1 > *Configuration:* 3 kafka brokers in different availability zones and 3 > zookeeper brokers in different availability zones. > I faced with deadlock in kafka. I've attached stack dump of the kafka state > to this ticket. The locked threads are "feature-zk-node-event-process-thread" > and "kafka-shutdown-hook". > *Context:* > My kafka cluster had connectivity problems with zookeeper and in the logs I > saw the next exception: > The stacktrace: > {code:java} > [2021-12-06 18:31:14,629] WARN Unable to reconnect to ZooKeeper service, > session 0x1039563000f has expired (org.apache.zookeeper.ClientCnxn) > [2021-12-06 18:31:14,629] INFO Unable to reconnect to ZooKeeper service, > session 0x1039563000f has expired, closing socket connection > (org.apache.zookeeper.ClientCnxn) > [2021-12-06 18:31:14,629] INFO EventThread shut down for session: > 0x1039563000f (org.apache.zookeeper.ClientCnxn) > [2021-12-06 18:31:14,631] INFO [ZooKeeperClient Kafka server] Session > expired. (kafka.zookeeper.ZooKeeperClient) > [2021-12-06 18:31:14,632] ERROR [feature-zk-node-event-process-thread]: > Failed to process feature ZK node change event. The broker will eventually > exit. > (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) > kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either > before or while waiting for connection > at > kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:279) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$1(ZooKeeperClient.scala:261) > at > kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:261) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1797) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1767) > at > kafka.zk.KafkaZkClient.retryRequestUntilConnected(KafkaZkClient.scala:1762) > at kafka.zk.KafkaZkClient.getDataAndStat(KafkaZkClient.scala:771) > at kafka.zk.KafkaZkClient.getDataAndVersion(KafkaZkClient.scala:755) > at > kafka.server.FinalizedFeatureChangeListener$FeatureCacheUpdater.updateLatestOrThrow(FinalizedFeatureChangeListener.scala:74) > at > kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) {code} > The exception is thrown in feature-zk-node-event-process-thread thread and it > is catched in method > FinalizedFeatureChangeListener.ChangeNotificationProcessorThread.doWork and > then doWork method throws FatalExitError(1). > The FatalExitError catched in ShutdownableThread.run method and call > Exit.exit(e.statusCode()) which calls System.exit under the hood. > The stackdump of "feature-zk-node-event-process-thread" thread: > {code:java} > "feature-zk-node-event-process-thread" #23 prio=5 os_prio=0 cpu=163.19ms > elapsed=1563046.32s tid=0x7fd0dcdec800 nid=0x2088 in Object.wait() > [0x7fd07e2c1000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(java.base@11.0.11/Native Method) > - waiting on > at java.lang.Thread.join(java.base@11.0.11/Thread.java:1300) > - waiting to re-lock in wait() <0x88b9d3c8> (a > org.apache.kafka.common.utils.KafkaThread) > at java.lang.Thread.join(java.base@11.0.11/Thread.java:1375) > at > java.lang.ApplicationShutdownHooks.runHooks(java.base@11.0.11/ApplicationShutdownHooks.java:107) > at > java.lang.ApplicationShutdownHooks$1.run(java.base@11.0.11/ApplicationShutdownHooks.java:46) > at java.lang.Shutdown.runHooks(java.base@11.0.11/Shutdown.java:130) > at java.lang.Shutdown.exit(java.base@11.0.11/Shutdown.java:174) > - locked <0x806872f8> (a java.lang.Class for java.lang.Shutdown) > at java.lang.Runtime.exit(java.base@11.0.11/Runtime.java:116) > at java.lang.System.exit(java.base@11.0.11/System.java:1752) > at org.apache.kafka.common.utils.Exit$2.execute(Exit.java:43) > at org.
[jira] [Created] (KAFKA-13555) Consider number if input topic partitions for task assignment
Matthias J. Sax created KAFKA-13555: --- Summary: Consider number if input topic partitions for task assignment Key: KAFKA-13555 URL: https://issues.apache.org/jira/browse/KAFKA-13555 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax StreamsAssignor tries to distribute tasks evenly across all instances/threads of a Kafka Streams application. It knows about instances/thread (to give more capacity to instances with more thread), and it distinguishes between stateless and stateful tasks. We also try to not move state around but to use a sticky assignment if possible. However, the assignment does not take the number of input topic partitions into account. For example, an upstream tasks could compute two joins, and thus has 3 input partitions, while a downstream task compute a follow up aggregation with a single input partitions (from the repartition topic). It could happen that one thread gets the 3 input partition tasks assigned, while the other thread get the single input partition tasks assigned resulting to an uneven partition assignment across both threads. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] junrao merged pull request #11607: KAFKA-13544: fix FinalizedFeatureChangeListener deadlock
junrao merged pull request #11607: URL: https://github.com/apache/kafka/pull/11607 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771749966 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { Review comment: Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #11605: MINOR: replace lastOption call in LocalLog#flush() to prevent NoSuchElementException
junrao merged pull request #11605: URL: https://github.com/apache/kafka/pull/11605 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771749813 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; +} else { +final QueryResult result = new QueryResult<>(value); Review comment: I mean, if the upper layers need `QueryResult` with proper type, and the inner layers need `QueryResult`, we should just have two properly types object, and instead of "swapping", just take the `byte[]` from `QueryResult`, deserialize them, and stuff the result into the `QueryResult` object. > but this is the API we agreed on in the KIP. I did not read the KIP :D (maybe I should have). And we can always adjust it. So me it seems useless to have a generic type parameter if we don't obey it anyway, and use casts. It's the purpose of generics to avoid casts, and if it does not avoid casts, it seems pointless to have). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: > I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too. If there is no strict reason not to throw an `IllegalStateException`, I would strongly advocate to throw. It not only guards against potential bugs, but also expresses the semantics for developers (ie, us) much cleaner and makes the code easier to read/reason about. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: > I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too. If there is no strict reason not to throw an `IllegalStateException`, I would strongly advocate to throw. It not only guards against potential bugs, but also expressed the semantics for developers (ie, us) much cleaner and make the code easier to read/reason about. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771746009 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -267,17 +281,41 @@ public boolean setFlushListener(final CacheFlushListener listener, return result; } + @SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedKeyQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = +KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final Deserializer deserializer = getValueDeserializer(); +final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult()); +final QueryResult typedQueryResult = +rawResult.swapResult(value); +result = (QueryResult) typedQueryResult; +} else { +// the generic type doesn't matter, since failed queries have no result set. +result = (QueryResult) rawResult; +} +return result; +} + +@SuppressWarnings({"unchecked", "rawtypes"}) private Deserializer getValueDeserializer() { -final Serde vSerde = serdes.valueSerde(); +final Serde valueSerde = serdes.valueSerde(); final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); final Deserializer deserializer; -if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { +if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { Review comment: I know it's weird, but it is correct. I would like to revisit it, but I think we really need to do that after the current round of queries are implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771745563 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -677,18 +638,56 @@ public void shouldHandlePingQuery() { } } +public void shouldHandleKeyQuery( +final Integer key, +final Function valueExtactor, +final Integer expectedValue) { + +final KeyQuery query = KeyQuery.withKey(key); +final StateQueryRequest request = +inStore(STORE_NAME) +.withQuery(query) +.withPartitions(mkSet(0, 1)) +.withPositionBound(PositionBound.at(INPUT_POSITION)); + +final StateQueryResult result = +IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + +final QueryResult queryResult = +result.getGlobalResult() != null Review comment: This line was written before I scratched global store support from the current scope. I'll drop the check from this test for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9234) Consider using @Nullable and @Nonnull annotations
[ https://issues.apache.org/jira/browse/KAFKA-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461722#comment-17461722 ] Lars Bodewig commented on KAFKA-9234: - Is contribution for this issue still welcome? As far as I can see the linked PR was closed after months without activity. > Consider using @Nullable and @Nonnull annotations > - > > Key: KAFKA-9234 > URL: https://issues.apache.org/jira/browse/KAFKA-9234 > Project: Kafka > Issue Type: Improvement > Components: admin, clients, consumer, KafkaConnect, producer , > streams, streams-test-utils >Reporter: Matthias J. Sax >Assignee: Manasvi Gupta >Priority: Minor > Labels: beginner, newbie > > Java7 was dropped some time ago, and we might want to consider usein Java8 > `@Nullable` and `@Nonnull` annotations for all public facing APIs instead of > documenting it in JavaDocs only. > This tickets should be broken down in a series of smaller PRs to keep the > scope of each PR contained, allowing for more effective reviews. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
guozhangwang commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771744164 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -162,15 +163,39 @@ public static boolean isPermitted( } final R result = (R) iterator; return QueryResult.forResult(result); -} catch (final Throwable t) { -final String message = parseStoreException(t, store, query); +} catch (final Exception e) { +final String message = parseStoreException(e, store, query); return QueryResult.forFailure( FailureReason.STORE_EXCEPTION, message ); } } +@SuppressWarnings("unchecked") +private static QueryResult runKeyQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store) { +if (store instanceof KeyValueStore) { +final KeyQuery rawKeyQuery = (KeyQuery) query; +final KeyValueStore keyValueStore = +(KeyValueStore) store; +try { +final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); +return (QueryResult) QueryResult.forResult(bytes); Review comment: @vvcephei but in this PR at least, `Should we use swap here as well?`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error
guozhangwang merged pull request #11451: URL: https://github.com/apache/kafka/pull/11451 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error
guozhangwang commented on a change in pull request #11451: URL: https://github.com/apache/kafka/pull/11451#discussion_r771742433 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -705,13 +705,13 @@ protected void onJoinPrepare(int generation, String memberId) { // so that users can still access the previously owned partitions to commit offsets etc. Exception exception = null; final Set revokedPartitions; -if (generation == Generation.NO_GENERATION.generationId && +if (generation == Generation.NO_GENERATION.generationId || Review comment: Thanks @dajac , I agree with that as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771742317 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -162,15 +163,39 @@ public static boolean isPermitted( } final R result = (R) iterator; return QueryResult.forResult(result); -} catch (final Throwable t) { -final String message = parseStoreException(t, store, query); +} catch (final Exception e) { +final String message = parseStoreException(e, store, query); return QueryResult.forFailure( FailureReason.STORE_EXCEPTION, message ); } } +@SuppressWarnings("unchecked") +private static QueryResult runKeyQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store) { +if (store instanceof KeyValueStore) { +final KeyQuery rawKeyQuery = (KeyQuery) query; +final KeyValueStore keyValueStore = +(KeyValueStore) store; +try { +final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); +return (QueryResult) QueryResult.forResult(bytes); Review comment: Thanks; let's keep that in mind as we tackle some of the API refactor tasks we've queued up. We started with the RawXQuery approach, then dropped it. Before we add it back, I think we'd better have a representative set of queries and also bear in mind all the other sharp edges we'd like to smooth over before release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771741301 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -253,12 +262,17 @@ public boolean setFlushListener(final CacheFlushListener listener, rawRangeQuery = RangeQuery.withNoBounds(); } final QueryResult> rawResult = -wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo); +wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo); if (rawResult.isSuccess()) { final KeyValueIterator iterator = rawResult.getResult(); final KeyValueIterator resultIterator = new MeteredKeyValueTimestampedIterator( -iterator, getSensor, getValueDeserializer()); -final QueryResult> typedQueryResult = QueryResult.forResult(resultIterator); +iterator, +getSensor, +getValueDeserializer() +); +final QueryResult> typedQueryResult = rawResult.swapResult( +resultIterator Review comment: Probably autoformatted because the line was too long. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery
John Roesler created KAFKA-13554: Summary: Rename RangeQuery to KeyRangeQuery Key: KAFKA-13554 URL: https://issues.apache.org/jira/browse/KAFKA-13554 Project: Kafka Issue Type: Sub-task Reporter: John Roesler Just to avoid confusion wrt WindowRangeQuery -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771741132 ## File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java ## @@ -34,7 +34,7 @@ * */ @Evolving -public class RangeQuery implements Query> { +public final class RangeQuery implements Query> { Review comment: https://issues.apache.org/jira/browse/KAFKA-13554 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers
[ https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461720#comment-17461720 ] Jun Rao commented on KAFKA-13077: - [~shivakumar] : So, it seems the issue is unrelated to file corruption. Did you just restart ZK, but not Kafka brokers? Typically, rolling restarting ZK shouldn't affect the brokers. > Replication failing after unclean shutdown of ZK and all brokers > > > Key: KAFKA-13077 > URL: https://issues.apache.org/jira/browse/KAFKA-13077 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Christopher Auston >Priority: Minor > > I am submitting this in the spirit of what can go wrong when an operator > violates the constraints Kafka depends on. I don't know if Kafka could or > should handle this more gracefully. I decided to file this issue because it > was easy to get the problem I'm reporting with Kubernetes StatefulSets (STS). > By "easy" I mean that I did not go out of my way to corrupt anything, I just > was not careful when restarting ZK and brokers. > I violated the constraints of keeping Zookeeper stable and at least one > running in-sync replica. > I am running the bitnami/kafka helm chart on Amazon EKS. > {quote}% kubectl get po kaf-kafka-0 -ojson |jq .spec.containers'[].image' > "docker.io/bitnami/kafka:2.8.0-debian-10-r43" > {quote} > I started with 3 ZK instances and 3 brokers (both STS). I changed the > cpu/memory requests on both STS and kubernetes proceeded to restart ZK and > kafka instances at the same time. If I recall correctly there were some > crashes and several restarts but eventually all the instances were running > again. It's possible all ZK nodes and all brokers were unavailable at various > points. > The problem I noticed was that two of the brokers were just continually > spitting out messages like: > {quote}% kubectl logs kaf-kafka-0 --tail 10 > [2021-07-13 14:26:08,871] INFO [ProducerStateManager > partition=__transaction_state-0] Loading producer state from snapshot file > 'SnapshotFile(/bitnami/kafka/data/__transaction_state-0/0001.snapshot,1)' > (kafka.log.ProducerStateManager) > [2021-07-13 14:26:08,871] WARN [Log partition=__transaction_state-0, > dir=/bitnami/kafka/data] *Non-monotonic update of high watermark from > (offset=2744 segment=[0:1048644]) to (offset=1 segment=[0:169])* > (kafka.log.Log) > [2021-07-13 14:26:08,874] INFO [Log partition=__transaction_state-10, > dir=/bitnami/kafka/data] Truncating to offset 2 (kafka.log.Log) > [2021-07-13 14:26:08,877] INFO [Log partition=__transaction_state-10, > dir=/bitnami/kafka/data] Loading producer state till offset 2 with message > format version 2 (kafka.log.Log) > [2021-07-13 14:26:08,877] INFO [ProducerStateManager > partition=__transaction_state-10] Loading producer state from snapshot file > 'SnapshotFile(/bitnami/kafka/data/__transaction_state-10/0002.snapshot,2)' > (kafka.log.ProducerStateManager) > [2021-07-13 14:26:08,877] WARN [Log partition=__transaction_state-10, > dir=/bitnami/kafka/data] Non-monotonic update of high watermark from > (offset=2930 segment=[0:1048717]) to (offset=2 segment=[0:338]) > (kafka.log.Log) > [2021-07-13 14:26:08,880] INFO [Log partition=__transaction_state-20, > dir=/bitnami/kafka/data] Truncating to offset 1 (kafka.log.Log) > [2021-07-13 14:26:08,882] INFO [Log partition=__transaction_state-20, > dir=/bitnami/kafka/data] Loading producer state till offset 1 with message > format version 2 (kafka.log.Log) > [2021-07-13 14:26:08,882] INFO [ProducerStateManager > partition=__transaction_state-20] Loading producer state from snapshot file > 'SnapshotFile(/bitnami/kafka/data/__transaction_state-20/0001.snapshot,1)' > (kafka.log.ProducerStateManager) > [2021-07-13 14:26:08,883] WARN [Log partition=__transaction_state-20, > dir=/bitnami/kafka/data] Non-monotonic update of high watermark from > (offset=2956 segment=[0:1048608]) to (offset=1 segment=[0:169]) > (kafka.log.Log) > {quote} > If I describe that topic I can see that several partitions have a leader of 2 > and the ISR is just 2 (NOTE I added two more brokers and tried to reassign > the topic onto brokers 2,3,4 which you can see below). The new brokers also > spit out the messages about "non-monotonic update" just like the original > followers. This describe output is from the following day. > {{% kafka-topics.sh ${=BS} -topic __transaction_state -describe}} > {{Topic: __transaction_state TopicId: i7bBNCeuQMWl-ZMpzrnMAw PartitionCount: > 50 ReplicationFactor: 3 Configs: > compression.type=uncompressed,min.insync.replicas=3,cleanup.policy=compact,flush.ms=1000,segment.bytes=104857600,flush.messages=1,max.message.bytes=112,unclean.leader.el
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771739597 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); +final Serde vSerde = serdes.valueSerde(); +final Deserializer deserializer; +if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { +final ValueAndTimestampDeserializer valueAndTimestampDeserializer = +(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer(); +deserializer = (Deserializer) valueAndTimestampDeserializer.valueDeserializer; +} else { +deserializer = vSerde.deserializer(); +} Review comment: Sorry, I missed this thread before. I think these points are discussed on other threads in this PR, though. Tl;dr: I think we should aim to clean this up in https://issues.apache.org/jira/browse/KAFKA-13526 For now, I believe this logic is correct. However, it's good that you pointed out we're only testing all _dsl_ store combinations. I filed https://issues.apache.org/jira/browse/KAFKA-13553 to extend the IT to also test all _papi_ store combinations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13553) Add DSL stores to IQv2StoreIntegrationTest
John Roesler created KAFKA-13553: Summary: Add DSL stores to IQv2StoreIntegrationTest Key: KAFKA-13553 URL: https://issues.apache.org/jira/browse/KAFKA-13553 Project: Kafka Issue Type: Sub-task Reporter: John Roesler Right now, we only test stores registered via the DSL. To be truly comprehensive, we must also test stores registered via the PAPI. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771738427 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -42,16 +102,21 @@ private StoreQueryUtils() { final int partition ) { -final QueryResult result; final long start = collectExecutionInfo ? System.nanoTime() : -1L; -if (query instanceof PingQuery) { -if (!isPermitted(position, positionBound, partition)) { -result = QueryResult.notUpToBound(position, positionBound, partition); -} else { -result = (QueryResult) QueryResult.forResult(true); -} -} else { +final QueryResult result; + +final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass()); Review comment: 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11609: KAFKA-12648: fixes for query APIs and blocking calls
guozhangwang commented on a change in pull request #11609: URL: https://github.com/apache/kafka/pull/11609#discussion_r771737051 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ## @@ -349,8 +351,10 @@ private void rebuildMetadata(final Map> activePart final Map> namedTopologyToStoreName = new HashMap<>(); final Set topologyNames = topologyMetadata.namedTopologiesView(); topologyNames.forEach(topologyName -> { -final Collection storesOnHostForTopologyName = getStoresOnHost(storeToSourceTopics, activePartitionHostMap.get(hostInfo), topologyName); - storesOnHostForTopologyName.addAll(getStoresOnHost(storeToSourceTopics, standbyPartitionHostMap.get(hostInfo), topologyName)); +final Map> topologyStoresToSourceTopics = Review comment: What's the difference between `topologyStoresToSourceTopics` and `storeToSourceTopics` here? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ## @@ -296,9 +297,10 @@ private boolean hasPartitionsForAnyTopics(final List topicNames, final S } private Set getStoresOnHost(final Map> storeToSourceTopics, -final Set sourceTopicPartitions, final String topologyName) { +final Set sourceTopicPartitions, +final String topologyName) { final InternalTopologyBuilder builder = topologyMetadata.lookupBuilderForNamedTopology(topologyName); -final Set sourceTopicNames = builder.sourceTopicNames(); +final Collection sourceTopicNames = builder.sourceTopicCollection(); Review comment: Do we want the raw topic names (without the prefix) or the decorated ones here? BTW The function/variable names are a bit confusing but they stored different things. Maybe we should just rename them to be more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771738126 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface +public interface QueryHandler { +QueryResult apply( +final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo, +final StateStore store +); +} + + +@SuppressWarnings("unchecked") +private static final Map, QueryHandler> QUERY_HANDLER_MAP = Review comment: Not sure if I fully understand, but might be less important. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771737792 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface Review comment: I see. So we should add it elsewhere, too (of course not as part of the IQ work). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771737653 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { Review comment: I'm getting the impression that you're not a huge fan of the phrasing of these messages. :) Can we tackle this question in a follow-on fashion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771737576 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); Review comment: > Also, because we always wrap the non-timestamped store with the KeyValueToTimestampedKeyValueByteStoreAdapter, we also always pass through the MeteredTimestampedKeyValue store whether the inner store is really timestamped or not. I don't think so. We only do this in the DSL, but not the PAPI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771737277 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); +final Serde vSerde = serdes.valueSerde(); +final Deserializer deserializer; +if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { Review comment: > The MeteredStore's serde is always a ValueAndTimestamp serde regardless of whether the inner store is Timestamped or not. Is it? (1) We also have `MeteredTimestampStore` (of course is extends `MeteredStore`) but it seems better to split the logic and move everything timestamp related into `MeteredTimestampStore`. (2) For PAPI users, they can add a plain `KeyValueStore` and we won't wrap it with the `TimestampedStore` face and the serdes won't be `ValueAndTimestamp` either. > What we do is, when you have a non-timestamped store, we wrap it with an extra layer (org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter) that pads the returned values with a fake timestamp (org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat We only do this in the DSL, if the user gives as a non-timestamped store via `Materialized` -- but for PAPI users, we never do this but use whatever store is given to use as-is. > so we did not implement the same padding logic for non-timestamped data and instead just bubble up to the MeteredStore Not sure if I can follow? It should not be a concern for IQ? Also, the current conversion between plain/timestamped is really just a corner case (and a case that we want to deprecate anyway -- we just did not find a way to do so -- maybe we should add a runtime check at some point and WARN users if they provide a non-timestamped store until we remove support for it and throw an exception instead...). Seems not worth to add more tech debt for this behavior that we only added to not break stuff. > Which means that if we want to deserialize it, we need to know whether to use the ValueAndTimestamp deserializer or just the Value's deserializer. Yes, but we should split this logic between the plain `MeteredStore` and the `MeteredTimestampStore`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771736938 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; +} else { +final QueryResult result = new QueryResult<>(value); Review comment: Thanks, @mjsax , I'm not sure precisely what you mean. This does create a new object. If you think it would be clearer to add a constructor allowing people to set the result along with a pre-populated executionInfo and position instead, we could, but this is the API we agreed on in the KIP. I want this new API to have good ergonomics, so I do want to consider these, but I don't think we need to hold up the KeyQuery PR on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771735849 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; +} else { +final QueryResult result = new QueryResult<>(value); Review comment: Thanks, @guozhangwang , I think something like that will be the outcome of this follow-on work: https://issues.apache.org/jira/browse/KAFKA-13526 We'll tackle that question before the first release of this new API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771735440 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: sounds good! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771735231 ## File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java ## @@ -34,7 +34,7 @@ * */ @Evolving -public class RangeQuery implements Query> { +public final class RangeQuery implements Query> { Review comment: Good point. We can rename it to KeyRangeQuery in a follow-on PR. I'll file a Jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771734218 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { Review comment: We can do this, but should we better say "passed down", not "handled" in the message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771733605 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; +} else { +final QueryResult result = new QueryResult<>(value); Review comment: Why not use two objects? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: > I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too. If there is no strict reason not to throw an `IllegalStateException`, I would strongly advocate to throw. It's not guards against potential bugs, but also expressed the semantics for developers (ie, us) much cleaner and make the code easier to read/reason about. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-12495: -- Component/s: KafkaConnect > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.2.0 > > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > **and also revoke some C/T** > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W4(delay: 0, assigned: [BT4, BT5], revoked: []) > // another round of rebalance to assign the new revoked C/T to the other > members > W1 rejoins with assignment: [AC0, AT1, AT2] > Rebalance is triggered > W2 joins with assignment: [AT4, AT5, BC0] > W3 joins with assignment: [BT1, BT2, BT4] > W4 joins with assignment: [BT4, BT5]
[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
mjsax commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r770076908 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: Why do we not allow to swap the result if the current result has a failure? And if we don't want to allow swapping, why just return `this` but not throw an exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11609: KAFKA-12648: fixes for query APIs and blocking calls
guozhangwang commented on pull request #11609: URL: https://github.com/apache/kafka/pull/11609#issuecomment-997076409 @ableegoldman some related tests are failing, I will go ahead and review the code still. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
guozhangwang commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771731855 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -267,17 +281,41 @@ public boolean setFlushListener(final CacheFlushListener listener, return result; } + @SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedKeyQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = +KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final Deserializer deserializer = getValueDeserializer(); +final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult()); +final QueryResult typedQueryResult = +rawResult.swapResult(value); +result = (QueryResult) typedQueryResult; +} else { +// the generic type doesn't matter, since failed queries have no result set. +result = (QueryResult) rawResult; +} +return result; +} + +@SuppressWarnings({"unchecked", "rawtypes"}) private Deserializer getValueDeserializer() { -final Serde vSerde = serdes.valueSerde(); +final Serde valueSerde = serdes.valueSerde(); final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); final Deserializer deserializer; -if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { +if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { Review comment: This is the part that I'm not completely sure about either... maybe some quick sync on this would be more effective? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
guozhangwang commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771726705 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -253,12 +262,17 @@ public boolean setFlushListener(final CacheFlushListener listener, rawRangeQuery = RangeQuery.withNoBounds(); } final QueryResult> rawResult = -wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo); +wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo); if (rawResult.isSuccess()) { final KeyValueIterator iterator = rawResult.getResult(); final KeyValueIterator resultIterator = new MeteredKeyValueTimestampedIterator( -iterator, getSensor, getValueDeserializer()); -final QueryResult> typedQueryResult = QueryResult.forResult(resultIterator); +iterator, +getSensor, +getValueDeserializer() +); +final QueryResult> typedQueryResult = rawResult.swapResult( +resultIterator Review comment: nit: why newline with just one parameter? ## File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +/** + * Interactive query for retrieving a single record based on its key. + */ +@Evolving +public final class KeyQuery implements Query { + +private final K key; + +private KeyQuery(final K key) { +this.key = key; +} + +/** + * Creates a query that will retrieve the record identified by {@code key} if it exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @param The type of the key + * @param The type of the value that will be retrieved + */ +public static KeyQuery withKey(final K key) { +return new KeyQuery<>(key); Review comment: Should we check the `key` is not null here? Since in later callers e.g. `final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));` we do not check if `getKey()` is null or not, and `keyBytes` function could throw if it is. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -162,15 +163,39 @@ public static boolean isPermitted( } final R result = (R) iterator; return QueryResult.forResult(result); -} catch (final Throwable t) { -final String message = parseStoreException(t, store, query); +} catch (final Exception e) { +final String message = parseStoreException(e, store, query); return QueryResult.forFailure( FailureReason.STORE_EXCEPTION, message ); } } +@SuppressWarnings("unchecked") +private static QueryResult runKeyQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store) { +if (store instanceof KeyValueStore) { +final KeyQuery rawKeyQuery = (KeyQuery) query; +final KeyValueStore keyValueStore = +(KeyValueStore) store; +try { +final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); +return (QueryResult) QueryResult.forResult(bytes); Review comment: Should we use `swap` here as well? Also, I'm feeling maybe we can introduce an internal class extending on `KeyQuery` and only define the `swap` in that class (see my other comment above). ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -162,15 +163,39 @@ public static boolean isPermitted( }
[GitHub] [kafka] cmccabe commented on a change in pull request #11596: MINOR: bump version in kraft readme
cmccabe commented on a change in pull request #11596: URL: https://github.com/apache/kafka/pull/11596#discussion_r771719219 ## File path: config/kraft/README.md ## @@ -14,8 +14,8 @@ Most important of all, KRaft mode is more scalable. We expect to be able to [su # Quickstart ## Warning -KRaft mode in Kafka 3.0 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode. In fact, when Kafka 3.1 is released, -it may not be possible to upgrade your KRaft clusters from 3.0 to 3.1. There may be bugs, including serious ones. You should *assume that your data could be lost at any time* if you try the preview release of KRaft mode. +KRaft mode in Kafka 3.1 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode. In fact, when Kafka 3.2 is released, Review comment: Let's remove the part about "In fact, when Kafka 3.2 is released, it may not be possible to upgrade your KRaft clusters from 3.1 to 3.2." We will support this upgrade path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #11596: MINOR: bump version in kraft readme
cmccabe commented on pull request #11596: URL: https://github.com/apache/kafka/pull/11596#issuecomment-997061036 Hi @showuon , thanks for the PR! I left one comment. LGTM after that is addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft
rondagostino commented on a change in pull request #11606: URL: https://github.com/apache/kafka/pull/11606#discussion_r771695520 ## File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ## @@ -67,6 +68,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { */ def generateConfigs: Seq[KafkaConfig] + /** + * It is sometimes useful to keep the same log.dirs configuration value; override this method if that is desired + * + * @param priorConfigs the prior configs + * @return the new generated configs + */ + def regenerateConfigs(priorConfigs: Seq[KafkaConfig]): Seq[KafkaConfig] = generateConfigs + Review comment: I was able to remove this new `regenerateConfigs()` method and just implement some logic in the `ServerShutdownTest` class to save away the prior configs and react accordingly. I'm fine with doing it this way given that reconfiguration is rarely used and this solution is pretty easy to implement in test classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771687546 ## File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java ## @@ -34,7 +34,7 @@ * */ @Evolving -public class RangeQuery implements Query> { +public final class RangeQuery implements Query> { Review comment: I had a review comment to add this to KeyQuery, so I added it to RangeQuery for exactly the same reason. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java ## @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.streams.query.Query; - -/** - * A very simple query that all stores can handle to verify that the store is participating in the - * IQv2 framework properly. - * - * This is not a public API and may change without notice. - */ -public class PingQuery implements Query { Review comment: Removed, since it was only for validating the framework in the absence of any query implementations, and now we have query implementations. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -54,22 +55,22 @@ ); } -private static final Map, QueryHandler> QUERY_HANDLER_MAP = +@SuppressWarnings("rawtypes") +private static final Map QUERY_HANDLER_MAP = mkMap( -mkEntry( -PingQuery.class, -(query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true) -), Review comment: Removed see the comment on the PingQuery class. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -162,15 +163,39 @@ public static boolean isPermitted( } final R result = (R) iterator; return QueryResult.forResult(result); -} catch (final Throwable t) { -final String message = parseStoreException(t, store, query); +} catch (final Exception e) { Review comment: Changed from Throwable to Exception to avoid swallowing Errors ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -527,10 +541,11 @@ public void verifyStore() { private void globalShouldRejectAllQueries() { // See KAFKA-13523 -final PingQuery query = new PingQuery(); -final StateQueryRequest request = inStore(STORE_NAME).withQuery(query); +final KeyQuery> query = KeyQuery.withKey(1); Review comment: Also replaced the PingQuery here. It also doesn't affect the evaluation. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -630,29 +609,11 @@ public void shouldHandlePingQuery() { .withQuery(query) .withPartitions(mkSet(0, 1)) .withPositionBound(PositionBound.at(INPUT_POSITION)); - final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); if (result.getGlobalResult() != null) { -final QueryResult> queryResult = result.getGlobalResult(); -final boolean failure = queryResult.isFailure(); -if (failure) { -throw new AssertionError(queryResult.toString()); -} -assertThat(queryResult.isSuccess(), is(true)); - -assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); -assertThrows(IllegalArgumentException.class, -queryResult::getFailureMessage); - -final KeyValueIterator iterator = queryResult.getResult(); -final Set actualValue = new HashSet<>(); -while (iterator.hasNext()) { -actualValue.add(valueExtactor.apply(iterator.next().value)); -} -assertThat(actualValue, is(expectedValue)); -assertThat(queryResult.getExecutionInfo(), is(empty())); +
[GitHub] [kafka] wcarlson5 commented on pull request #11611: MINOR: prefix topics if internal config is set
wcarlson5 commented on pull request #11611: URL: https://github.com/apache/kafka/pull/11611#issuecomment-997025039 @ableegoldman This should make it so we can have topics prefixed with whatever is needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request #11611: MINOR: prefix topics if internal config is set
wcarlson5 opened a new pull request #11611: URL: https://github.com/apache/kafka/pull/11611 In order to move a topology to another runtime without having to copy over the internal topics it would be good to have the option to not prefix the internal topics with the application ID. So this change will introduce a new config that if set will be the internal topic prefix ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gwenshap closed pull request #11555: MINOR: Correct usage of ConfigException in file and directory config providers
gwenshap closed pull request #11555: URL: https://github.com/apache/kafka/pull/11555 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OneCricketeer commented on pull request #7965: KAFKA-9436: New Kafka Connect SMT for plainText => Struct(or Map)
OneCricketeer commented on pull request #7965: URL: https://github.com/apache/kafka/pull/7965#issuecomment-996944487 > suggest using grok patterns I knew I'd seen this somewhere before, but finally found it again https://github.com/streamthoughts/kafka-connect-transform-grok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13546) Explicitly specifying default topic creation groups should not make connector fail
[ https://issues.apache.org/jira/browse/KAFKA-13546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] venkat teki reassigned KAFKA-13546: --- Assignee: venkat teki > Explicitly specifying default topic creation groups should not make connector > fail > -- > > Key: KAFKA-13546 > URL: https://issues.apache.org/jira/browse/KAFKA-13546 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.6.0, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 2.6.3, 2.7.2, > 2.8.1 >Reporter: venkat teki >Assignee: venkat teki >Priority: Major > > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] > introduced support for Connect worker to allow source connector > configurations to define topic creation settings. > A new source connector configuration {{topic.creation.groups}} was > introduced, which takes a list of groups. > *Expected behavior* > According to KIP-158, specifying value "default" in {{topic.creation.groups}} > configration should throw a warning, but not let connector fail. > *Actual behavior* > Specifying "default" will make a connector fail -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771530728 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -42,16 +102,21 @@ private StoreQueryUtils() { final int partition ) { -final QueryResult result; final long start = collectExecutionInfo ? System.nanoTime() : -1L; -if (query instanceof PingQuery) { -if (!isPermitted(position, positionBound, partition)) { -result = QueryResult.notUpToBound(position, positionBound, partition); -} else { -result = (QueryResult) QueryResult.forResult(true); -} -} else { +final QueryResult result; + +final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass()); Review comment: Yep, that's accurate, but many of the stores will have the exact same logic as each other, so it made sense to consolidate it, which is what this util class is for. The function in the query map just checks the type of the store so that it can either cast it to execute the query or return "unknown query". That way, we can use the same dispatch map for all queries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771520958 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface +public interface QueryHandler { +QueryResult apply( +final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo, +final StateStore store +); +} + + +@SuppressWarnings("unchecked") +private static final Map, QueryHandler> QUERY_HANDLER_MAP = +mkMap( +mkEntry( +PingQuery.class, +(query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true) +), +mkEntry(KeyQuery.class, +(query, positionBound, collectExecutionInfo, store) -> { +if (store instanceof KeyValueStore) { +final KeyQuery rawKeyQuery = (KeyQuery) query; +final KeyValueStore keyValueStore = +(KeyValueStore) store; +try { +final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); +return QueryResult.forResult(bytes); +} catch (final Throwable t) { Review comment: Good point. It's fine to catch Throwables, but it's not fine to swallow Errors, as I'm doing here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771517463 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface +public interface QueryHandler { +QueryResult apply( +final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo, +final StateStore store +); +} + + +@SuppressWarnings("unchecked") +private static final Map, QueryHandler> QUERY_HANDLER_MAP = +mkMap( +mkEntry( +PingQuery.class, Review comment: Actually, the PingQuery isn't in the KIP at all. I added it (as an internal API) so that I could verify the stores work properly in the absence of any actual queries (because I implemented the framework before any real queries, to control the scope of the PR). Now that we have real queries, I don't think we need to keep Ping around. I'll remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771517463 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface +public interface QueryHandler { +QueryResult apply( +final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo, +final StateStore store +); +} + + +@SuppressWarnings("unchecked") +private static final Map, QueryHandler> QUERY_HANDLER_MAP = +mkMap( +mkEntry( +PingQuery.class, Review comment: Actually, the PingQuery isn't in the KIP at all. I added it so that I could verify the stores work properly in the absence of any actual queries (because I implemented the framework before any real queries, to control the scope of the PR). Now that we have real queries, I don't think we need to keep Ping around. I'll remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771515988 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface +public interface QueryHandler { +QueryResult apply( +final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo, +final StateStore store +); +} + + +@SuppressWarnings("unchecked") +private static final Map, QueryHandler> QUERY_HANDLER_MAP = Review comment: They both exist to dispatch query execution logic. The MeteredStores' logic is to translate results from the inner stores, and the inner stores' logic is to execute the query. Since we have a lot of functionally identical stores (i.e., many KeyValue stores, etc.), it made sense to consolidate their execution logic here instead of duplicating it in every store class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771513674 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -16,18 +16,78 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + public final class StoreQueryUtils { +/** + * a utility interface to facilitate stores' query dispatch logic, + * allowing them to generically store query execution logic as the values + * in a map. + */ +@FunctionalInterface Review comment: The compiler is smart enough. It's just an informative annotation. Its only practical purpose is to raise compilation error if you try to declare more than one method in it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771512115 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); Review comment: I'd forgotten about MeteredTimestampedKeyValueStore, but now that I'm looking at it, what it does is extend the MeteredKeyValueStore, apparently specifically to pad the value serde with a ValueAndTimestamp serde. Otherwise, all the logic lives in MeteredKeyValueStore. Also, because we always wrap the non-timestamped store with the `KeyValueToTimestampedKeyValueByteStoreAdapter`, we also always pass through the MeteredTimestampedKeyValue store whether the inner store is really timestamped or not. I think we could clean this whole hierarchy up a bit, but it's not necessary as part of this work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771505654 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { +final QueryResult result; +final KeyQuery typedQuery = (KeyQuery) query; +final KeyQuery rawKeyQuery = KeyQuery.withKey(keyBytes(typedQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); +if (rawResult.isSuccess()) { +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); +final Serde vSerde = serdes.valueSerde(); +final Deserializer deserializer; +if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { Review comment: Yes, this is super weird, and I think that https://issues.apache.org/jira/browse/KAFKA-13526 will give us a more elegant way to correct it, but as it stands right now, this is necessary. The MeteredStore's serde is always a ValueAndTimestamp serde regardless of whether the inner store is Timestamped or not. This works because the normal execution flow actually converts the byte results from non-timestamped stores into the binary schema of a ValueAndTimestamp. What we do is, when you have a non-timestamped store, we wrap it with an extra layer (`org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter`) that pads the returned values with a fake timestamp (`org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat`). That makes sense when the store is used by processors (particularly the ones in the DSL) because it makes the store configuration orthogonal to the processor logic, but for IQ, it's just spending extra time and memory for no productive purpose. One of the primary design goals of IQv2 is to make query execution as lean as possible, so we did not implement the same padding logic for non-timestamped data and instead just bubble up to the MeteredStore the actual byte array returned from the BytesStore. Which means that if we want to deserialize it, we need to know whether to use the ValueAndTimestamp deserializer or just the Value's deserializer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13549) Add "delete interval" config
[ https://issues.apache.org/jira/browse/KAFKA-13549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461528#comment-17461528 ] Nicholas Telford commented on KAFKA-13549: -- I drafted a patch for this and only noticed the "needs-kip" label after I submitted the PR. Hopefully it at least serves as a starting point :D > Add "delete interval" config > > > Key: KAFKA-13549 > URL: https://issues.apache.org/jira/browse/KAFKA-13549 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > Kafka Streams uses "delete record" requests to aggressively purge data from > repartition topics. Those request are sent each time we commit. > For at-least-once with a default commit interval of 30 seconds, this works > fine. However, for exactly-once with a default commit interval of 100ms, it's > very aggressive. The main issue is broker side, because the broker logs every > "delete record" request, and thus broker logs are spammed if EOS is enabled. > We should consider to add a new config (eg `delete.record.interval.ms` or > similar) to have a dedicated config for "delete record" requests, to decouple > it from the commit interval config and allow to purge data less aggressively, > even if the commit interval is small to avoid the broker side log spamming. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771495764 ## File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +@Evolving +public class KeyQuery implements Query { Review comment: Good idea. I'm not sure whether it will be ultimately be good to extend queries with other queries later, but it doesn't hurt to add this now so that we can make an explicit decision about it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771494393 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); +} +} +return result; +} + +@SuppressWarnings("unchecked") +private QueryResult runKeyQuery(final Query query, +final PositionBound positionBound, final boolean collectExecutionInfo) { Review comment: Sorry about that; oversight. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771493968 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +collectExecutionInfo, +this +); +if (collectExecutionInfo) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (System.nanoTime() - start) + "ns"); Review comment: Thanks; this seems about the same, and it would apply to all the other execution info messages we've got, so I think I'll keep it the same for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771492764 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); +final QueryResult result; + +final QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, collectExecutionInfo); +if (collectExecutionInfo) { Review comment: Yeah, the idea was to actually be able to see everything that happened during query execution, specifically to demystify what's going on when you're debugging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771490884 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -186,6 +203,68 @@ public boolean setFlushListener(final CacheFlushListener listener, return false; } +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final boolean collectExecutionInfo) { + +final long start = System.nanoTime(); Review comment: Not a bad idea! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nicktelford commented on pull request #11610: KAFKA-13549: Add delete.interval.ms to Streams
nicktelford commented on pull request #11610: URL: https://github.com/apache/kafka/pull/11610#issuecomment-996810481 @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nicktelford opened a new pull request #11610: KAFKA-13549: Add delete.interval.ms to Streams
nicktelford opened a new pull request #11610: URL: https://github.com/apache/kafka/pull/11610 Records are explicitly deleted once they have been fully consumed. Currently, this is done every time the Task is committed, resulting in "delete records" requests being sent every `commit.interval.ms` milliseconds. When `commit.interval.ms` is set very low, for example when `processing.guarantee` is set to `exactly_once_v2`, this causes delete records requests to be sent extremely frequently, potentially reducing throughput and causing a high volume of log messages to be logged by the brokers. Disconnecting delete records requests from the commit interval resolves this problem. We now only explicitly delete records for a repartition topic when we commit, if it's been at least `delete.interval.ms` milliseconds since the last time we deleted records. Because we still require a commit to trigger record deletion, the lower-bound of `delete.interval.ms` is effectively capped at the `commit.interval.ms`. For compatibility, the default `delete.interval.ms` is set to 30 seconds, the same as the default `commit.interval.ms`. Users who have configured a different `commit.interval.ms` may need to review and change `delete.interval.ms`. Unlike `commit.interval.ms`, we don't dynamically change the default for `delete.interval.ms` when EOS processing is enabled, as it's important not to flood brokers with the record deletions, and we want a sensible default. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* This code is my own work and is licensed to the Apache Kafka project under the terms of the same license (ASL 2) as the project itself. ### Committer Checklist (excluded from commit message) - [X] Verify design and implementation - [X] Verify test coverage and CI build status - [X] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771472556 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; +} else { +final QueryResult result = new QueryResult<>(value); Review comment: What's happening here is that we're turning a `QueryResult` into a `QueryResult`. A concrete example (in fact the only use case) of this is in the MeteredStore, we get back a raw result from the BytesStore and need to deserialize it, so we need to convert the `QueryResult` into a `QueryResult` or something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771470456 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { +if (isFailure()) { +return (QueryResult) this; Review comment: In the case of a failure, there is no result, just the failure message. I wanted to maintain an invariant that there is always either a failure or a result, but not both or neither. I also didn't think it would be right to allow accidentally converting a failure to a successful result via this method. I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771466959 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -29,10 +29,10 @@ */ public final class QueryResult { -private final List executionInfo = new LinkedList<>(); private final FailureReason failureReason; private final String failure; private final R result; +private List executionInfo = new LinkedList<>(); Review comment: Thanks; that would be another way to do it. I'm not sure if that would be clearly better or not, though. It's for getting more details about how the query was actually executed inside of Streams. Right now, if you request it as part of the query, each store layer will report what it did and how long it took. For runtime queries, you wouldn't want to use it, but I wanted to enable debugging if the cases where query execution seems like it's taking longer than expected. Also, it could be used for tracing, in which every Nth query is run with the execution info on. It's a list of Strings so that each store layer / operation can just add one "line" of info (like a stack trace), but we don't waste time and memory actually concatenating them with newlines. We considered adding more structure (such as having a field for execution time), but kept it as a string so as not to restrict the kind of "execution information" we might find useful to add in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771459374 ## File path: streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java ## @@ -52,5 +52,12 @@ * The requested store partition does not exist at all. For example, partition 4 was requested, * but the store in question only has 4 partitions (0 through 3). */ -DOES_NOT_EXIST; +DOES_NOT_EXIST, + +/** + * The store that handled the query got an exception during query execution. The message + * will contain the exception details. Depending on the nature of the exception, the caller + * may be able to retry this instance or may need to try a different instance. + */ +STORE_EXCEPTION; Review comment: Thanks! @guozhangwang reminded me during the discussion to make sure that all the cases in that KIP were accounted for. Some are still exceptions, and some are now FailureReasons: https://lists.apache.org/thread/brvwvpvsbsfvqpqg6jvry5hqny0vm2tr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2
vvcephei commented on a change in pull request #11582: URL: https://github.com/apache/kafka/pull/11582#discussion_r771454005 ## File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java ## @@ -197,6 +197,18 @@ public R getResult() { return result; } +@SuppressWarnings("unchecked") +public QueryResult swapResult(final V value) { Review comment: The purpose of this method is to allow `MeteredKeyValue` store to deserialize the result without wiping out the execution info or position that it got back from the bytes store. I missed that while reviewing your PR, so I went ahead and added a fix for it to this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13502) Support configuring BROKER_LOGGER on controller-only KRaft nodes
[ https://issues.apache.org/jira/browse/KAFKA-13502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461478#comment-17461478 ] Ron Dagostino commented on KAFKA-13502: --- This is one aspect of the broader problem as described in https://issues.apache.org/jira/browse/KAFKA-13552 > Support configuring BROKER_LOGGER on controller-only KRaft nodes > > > Key: KAFKA-13502 > URL: https://issues.apache.org/jira/browse/KAFKA-13502 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13552) Unable to dynamically change broker log levels on KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461476#comment-17461476 ] Ron Dagostino commented on KAFKA-13552: --- [~dengziming] Thanks for pointing that out. Although there is not much in that ticket, it appears to address controller-only nodes, whereas this ticket indicates that no KRaft node (broker-only, controller-only, or combined broker+controller) supports dynamic changes to the log levels . I updated the description of this ticket to point to that one since it is just one aspect of the problem. > Unable to dynamically change broker log levels on KRaft > --- > > Key: KAFKA-13552 > URL: https://issues.apache.org/jira/browse/KAFKA-13552 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ron Dagostino >Priority: Major > > It is currently not possible to dynamically change the log level in KRaft. > For example: > kafka-configs.sh --bootstrap-server --alter --add-config > "kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers > --entity-name 0 > Results in: > org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource > type BROKER_LOGGER. > The code to process this request is in ZkAdminManager.alterLogLevelConfigs(). > This needs to be moved out of there, and the functionality has to be > processed locally on the broker instead of being forwarded to the KRaft > controller. > It is also an open question as to how we can dynamically alter log levels for > a remote KRaft controller. Connecting directly to it is one possible > solution, but that may not be desirable since generally connecting directly > to the controller is not necessary. The ticket for this particular spect of > the issue is https://issues.apache.org/jira/browse/KAFKA-13502 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13552) Unable to dynamically change broker log levels on KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-13552: -- Description: It is currently not possible to dynamically change the log level in KRaft. For example: kafka-configs.sh --bootstrap-server --alter --add-config "kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers --entity-name 0 Results in: org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource type BROKER_LOGGER. The code to process this request is in ZkAdminManager.alterLogLevelConfigs(). This needs to be moved out of there, and the functionality has to be processed locally on the broker instead of being forwarded to the KRaft controller. It is also an open question as to how we can dynamically alter log levels for a remote KRaft controller. Connecting directly to it is one possible solution, but that may not be desirable since generally connecting directly to the controller is not necessary. The ticket for this particular spect of the issue is https://issues.apache.org/jira/browse/KAFKA-13502 was: It is currently not possible to dynamically change the log level in KRaft. For example: kafka-configs.sh --bootstrap-server --alter --add-config "kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers --entity-name 0 Results in: org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource type BROKER_LOGGER. The code to process this request is in ZkAdminManager.alterLogLevelConfigs(). This needs to be moved out of there, and the functionality has to be processed locally on the broker instead of being forwarded to the KRaft controller. It is also an open question as to how we can dynamically alter log levels for a remote KRaft controller. Connecting directly to it is one possible solution, but that may not be desirable since generally connecting directly to the controller is not necessary. > Unable to dynamically change broker log levels on KRaft > --- > > Key: KAFKA-13552 > URL: https://issues.apache.org/jira/browse/KAFKA-13552 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ron Dagostino >Priority: Major > > It is currently not possible to dynamically change the log level in KRaft. > For example: > kafka-configs.sh --bootstrap-server --alter --add-config > "kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers > --entity-name 0 > Results in: > org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource > type BROKER_LOGGER. > The code to process this request is in ZkAdminManager.alterLogLevelConfigs(). > This needs to be moved out of there, and the functionality has to be > processed locally on the broker instead of being forwarded to the KRaft > controller. > It is also an open question as to how we can dynamically alter log levels for > a remote KRaft controller. Connecting directly to it is one possible > solution, but that may not be desirable since generally connecting directly > to the controller is not necessary. The ticket for this particular spect of > the issue is https://issues.apache.org/jira/browse/KAFKA-13502 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13547) Kafka - 1.0.0 | Remove log4j.jar
[ https://issues.apache.org/jira/browse/KAFKA-13547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461472#comment-17461472 ] Dongjin Lee commented on KAFKA-13547: - [~masood31] I am currently working on a preview version based on AK 2.8.1 and 3.0.0, and I will complete it in this weekend. It will replace all log4j 1.x dependency into 2.x, with backward compatibility of logger configuration. +1. I can't guarantee when it will be merged into the official release. > Kafka - 1.0.0 | Remove log4j.jar > > > Key: KAFKA-13547 > URL: https://issues.apache.org/jira/browse/KAFKA-13547 > Project: Kafka > Issue Type: Bug >Reporter: masood >Priority: Blocker > > We wanted to remove the log4j.jar but ended up with a dependency on the > kafka.producer.ProducerConfig. > Caused by: java.lang.NoClassDefFoundError: org/apache/log4j/Logger > at kafka.utils.Logging.logger(Logging.scala:24) > at kafka.utils.Logging.logger$(Logging.scala:24) > at > kafka.utils.VerifiableProperties.logger$lzycompute(VerifiableProperties.scala:27) > at kafka.utils.VerifiableProperties.logger(VerifiableProperties.scala:27) > at kafka.utils.Logging.info(Logging.scala:71) > at kafka.utils.Logging.info$(Logging.scala:70) > at kafka.utils.VerifiableProperties.info(VerifiableProperties.scala:27) > at kafka.utils.VerifiableProperties.verify(VerifiableProperties.scala:218) > at kafka.producer.ProducerConfig.(ProducerConfig.scala:61) > Is there any configuration available which can resolve this error. > Please note we are not using log4j.properties or any other log4j logging > mechanism for Kafka connection in the application. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
showuon commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r771395371 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) { final RuntimeException exception = future.exception(); resetJoinGroupFuture(); +rejoinReason = "rebalance failed due to " + exception.getClass() + " error: " + exception.getMessage(); Review comment: +1 for `getSimpleName` for the class. In addition to the David's suggestion, I think we should also remove the 2nd `due to`, because there is already 1 `due to` in the sentence. ex: `rebalance failed: '$message' ($class)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
showuon commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r771384033 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1249,7 +1260,7 @@ class GroupCoordinator(val brokerId: Int, // for new members. If the new member is still there, we expect it to retry. completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs) -maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId") +maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId. Member joined due to $reason") Review comment: +1 for `; client reason: $reason` ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) { final RuntimeException exception = future.exception(); resetJoinGroupFuture(); +rejoinReason = "rebalance failed due to " + exception.getClass() + " error: " + exception.getMessage(); Review comment: +1 for `getSimpleName` for the class. In addition to the David's suggestion, I think we should also remove the `due to`, because there is already 1 `due to` in the sentence. ex: `rebalance failed: '$message' ($class)` ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ## @@ -486,6 +487,43 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() { ensureActiveGroup(rejoinedGeneration, memberId); } +@Test +public void testRejoinReason() { +setupCoordinator(); + +String memberId = "memberId"; +int generation = 5; + +// test initial reason +mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +expectJoinGroup("", "initialized abstract coordinator", generation, memberId); + +// successful sync group response should reset reason +expectSyncGroup(generation, memberId); +ensureActiveGroup(generation, memberId); +assertEquals("", coordinator.rejoinReason()); + +// Force a rebalance +expectJoinGroup(memberId, "Manual test trigger", generation, memberId); +expectSyncGroup(generation, memberId); +coordinator.requestRejoin("Manual test trigger"); +ensureActiveGroup(generation, memberId); +assertEquals("", coordinator.rejoinReason()); + +// max group size reached + mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED)); +coordinator.requestRejoin("Manual test trigger 2"); +try { +coordinator.joinGroupIfNeeded(mockTime.timer(100L)); +} catch (GroupMaxSizeReachedException e) { Review comment: Actually, you can achieve what you want by `assertThrows` as below: ```java Throwable e = assertThrows(GroupMaxSizeReachedException.class, () -> coordinator.joinGroupIfNeeded(mockTime.timer(100L))); // next join group request should contain exception message expectJoinGroup(memberId, e.getMessage(), generation, memberId); expectSyncGroup(generation, memberId); ensureActiveGroup(generation, memberId); assertEquals("", coordinator.rejoinReason()); ``` Basically, the `assertThrows` is doing the similar thing as what you did here (try/catch). It's recommended to use the `assertThrows` to understand the exception is also a verification position in this test. Otherwise, let's say, if someday, someone breaks the logic and makes the `coordinator.joinGroupIfNeeded(mockTime.timer(100L));` works without exception thrown, your current test can't catch this error, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest
dajac commented on a change in pull request #11571: URL: https://github.com/apache/kafka/pull/11571#discussion_r771376907 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -617,9 +617,10 @@ class GroupCoordinator(val brokerId: Int, leavingMembers: List[MemberIdentity], responseCallback: LeaveGroupResult => Unit): Unit = { -def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String): Unit = { +def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String, reason: Option[String]): Unit = { val member = group.get(memberId) - removeMemberAndUpdateGroup(group, member, s"Removing member $memberId on LeaveGroup") + val leaveReason = reason.getOrElse("unknown reason") + removeMemberAndUpdateGroup(group, member, s"Removing member $memberId on LeaveGroup due to: $leaveReason") Review comment: I think that we should use the same pattern for both join reasons and leave reason. See https://github.com/apache/kafka/pull/11566#discussion_r771371068. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
dajac commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r771371068 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1249,7 +1260,7 @@ class GroupCoordinator(val brokerId: Int, // for new members. If the new member is still there, we expect it to retry. completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs) -maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId") +maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId. Member joined due to $reason") Review comment: The output log is quite hard to follow at the moment. Example: ``` [2021-12-17 11:29:16,061] INFO [GroupCoordinator 0]: Preparing to rebalance group test in state PreparingRebalance with old generation 1 (__consumer_offsets-48) (reason: Adding new member console-consumer-1d5a9905-c271-4700-a817-62fc9b9f28fc with group instance id None. Member joined due to rebalance failed due to class org.apache.kafka.common.errors.MemberIdRequiredException error: The group member needs to have a valid member id before actually entering a consumer group.) (kafka.coordinator.group.GroupCoordinator) ``` How about doing the following? For each reason, we could add `; client reason: $reason`. With this, we will always have (reason: ; client reason: ...) in each rebalance logs. It might be clearer. What do you think? ## File path: clients/src/main/resources/common/message/JoinGroupResponse.json ## @@ -31,7 +31,9 @@ // Version 6 is the first flexible version. // // Starting from version 7, the broker sends back the Protocol Type to the client (KIP-559). - "validVersions": "0-7", + // + // Version 8 adds the Reason field (KIP-800). Review comment: nit: Should we rather say Version 8 is the same as version 7. here? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) { final RuntimeException exception = future.exception(); resetJoinGroupFuture(); +rejoinReason = "rebalance failed due to " + exception.getClass() + " error: " + exception.getMessage(); Review comment: Example on the broker side: ``` [2021-12-17 11:29:16,061] INFO [GroupCoordinator 0]: Preparing to rebalance group test in state PreparingRebalance with old generation 1 (__consumer_offsets-48) (reason: Adding new member console-consumer-1d5a9905-c271-4700-a817-62fc9b9f28fc with group instance id None. Member joined due to rebalance failed due to class org.apache.kafka.common.errors.MemberIdRequiredException error: The group member needs to have a valid member id before actually entering a consumer group.) (kafka.coordinator.group.GroupCoordinator) ``` * Should we only get the `getSimpleName` of the class? * There are many `:` in the log. I wonder if we could remove the one we've put here. Perhaps, we could use the following pattern: `rebalance failed due to '$message' ($class)`. What do you think? ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -181,6 +182,7 @@ class GroupCoordinator(val brokerId: Int, responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID)) case Some(group) => group.inLock { +val joinReason = reason.getOrElse("unknown reason") Review comment: If we do this, it might be better to not use an `Option` after all. We could simply provided the default reason to `handleJoinGroup` if none is provided. Also, how about using `not provided` instead of `unknown reason`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest
dajac commented on a change in pull request #11571: URL: https://github.com/apache/kafka/pull/11571#discussion_r771248687 ## File path: clients/src/main/resources/common/message/LeaveGroupResponse.json ## @@ -24,7 +24,9 @@ // Starting in version 3, we will make leave group request into batch mode and add group.instance.id. // // Version 4 is the first flexible version. - "validVersions": "0-4", + // + // Version 5 adds the Reason field (KIP-800). Review comment: nit: Should we rather say `Version 5 is the same as version 4.` here? ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -1837,7 +1837,12 @@ private DescribeGroupsResponse createDescribeGroupResponse() { } private LeaveGroupRequest createLeaveGroupRequest(short version) { -return new LeaveGroupRequest.Builder("group1", singletonList(new MemberIdentity().setMemberId("consumer1"))) +MemberIdentity member = new MemberIdentity() +.setMemberId("consumer1"); Review comment: nit: I think that we can keep this one on the previous line. ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -1837,7 +1837,12 @@ private DescribeGroupsResponse createDescribeGroupResponse() { } private LeaveGroupRequest createLeaveGroupRequest(short version) { -return new LeaveGroupRequest.Builder("group1", singletonList(new MemberIdentity().setMemberId("consumer1"))) +MemberIdentity member = new MemberIdentity() +.setMemberId("consumer1"); +if (version >= 5) { +member.setMemberId("reason: test"); Review comment: This is not correct. ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -617,9 +617,10 @@ class GroupCoordinator(val brokerId: Int, leavingMembers: List[MemberIdentity], responseCallback: LeaveGroupResult => Unit): Unit = { -def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String): Unit = { +def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String, reason: Option[String]): Unit = { val member = group.get(memberId) - removeMemberAndUpdateGroup(group, member, s"Removing member $memberId on LeaveGroup") + val leaveReason = reason.getOrElse("unknown reason") + removeMemberAndUpdateGroup(group, member, s"Removing member $memberId on LeaveGroup due to: $leaveReason") removeHeartbeatForLeavingMember(group, member.memberId) info(s"Member $member has left group $groupId through explicit `LeaveGroup` request") Review comment: I am also tempted to add the reason here. What do you think? ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -3907,6 +3909,42 @@ public void testRemoveMembersFromGroup() throws Exception { } } +@Test +public void testRemoveMembersFromGroupReason() throws Exception { +final Cluster cluster = mockCluster(3, 0); +final Time time = new MockTime(); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster)) { + +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); +env.kafkaClient().prepareResponse(body -> { +if (!(body instanceof LeaveGroupRequest)) { +return false; +} +LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) body).data(); + +return leaveGroupRequest.members().stream().allMatch(member -> member.reason().equals("testing remove members reason")); +}, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers( +Arrays.asList( +new MemberResponse().setGroupInstanceId("instance-1"), +new MemberResponse().setGroupInstanceId("instance-2") +)) Review comment: nit: Indentation of those lines seems to be off here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies
cadonna commented on pull request #11600: URL: https://github.com/apache/kafka/pull/11600#issuecomment-996549823 @ableegoldman There are checkstyle errors in the builds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #11560: KAFKA-7589: Allow configuring network threads per listener
tombentley commented on a change in pull request #11560: URL: https://github.com/apache/kafka/pull/11560#discussion_r771207226 ## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ## @@ -920,7 +919,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = { kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case (key, _) => // skip the reconfigurable configs - !DynamicSecurityConfigs.contains(key) && !SocketServer.ListenerReconfigurableConfigs.contains(key) + !DynamicSecurityConfigs.contains(key) && !SocketServer.ListenerReconfigurableConfigs.contains(key) && !DataPlaneAcceptor.ListenerReconfigurableConfigs.contains(key) Review comment: Oh, of course it's single threaded! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] richard-axual commented on a change in pull request #11535: KAFKA-13476: Increase resilience timestamp decoding Kafka Streams
richard-axual commented on a change in pull request #11535: URL: https://github.com/apache/kafka/pull/11535#discussion_r771190189 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -1097,15 +1097,20 @@ long decodeTimestamp(final String encryptedString) { if (encryptedString.isEmpty()) { return RecordQueue.UNKNOWN; } -final ByteBuffer buffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString)); -final byte version = buffer.get(); -switch (version) { -case LATEST_MAGIC_BYTE: -return buffer.getLong(); -default: -log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.", - LATEST_MAGIC_BYTE, version); -return RecordQueue.UNKNOWN; +try { +final ByteBuffer buffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString)); +final byte version = buffer.get(); +switch (version) { +case LATEST_MAGIC_BYTE: +return buffer.getLong(); +default: +log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.", +LATEST_MAGIC_BYTE, version); +return RecordQueue.UNKNOWN; +} +} catch (final IllegalArgumentException argumentException) { +log.warn("Unsupported offset metadata found {}", encryptedString); Review comment: Very good point about logging the encryptedString, I've removed it from the log statement -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error
showuon commented on a change in pull request #11451: URL: https://github.com/apache/kafka/pull/11451#discussion_r771181554 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -705,13 +705,13 @@ protected void onJoinPrepare(int generation, String memberId) { // so that users can still access the previously owned partitions to commit offsets etc. Exception exception = null; final Set revokedPartitions; -if (generation == Generation.NO_GENERATION.generationId && +if (generation == Generation.NO_GENERATION.generationId || Review comment: Yes, I agree that it doesn't hurt to have it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error
showuon commented on a change in pull request #11451: URL: https://github.com/apache/kafka/pull/11451#discussion_r771181062 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -755,18 +755,17 @@ protected void onJoinPrepare(int generation, String memberId) { @Override public void onLeavePrepare() { -// Save the current Generation and use that to get the memberId, as the hb thread can change it at any time +// Save the current Generation, as the hb thread can change it at any time final Generation currentGeneration = generation(); -final String memberId = currentGeneration.memberId; -log.debug("Executing onLeavePrepare with generation {} and memberId {}", currentGeneration, memberId); +log.debug("Executing onLeavePrepare with generation {}", currentGeneration); // we should reset assignment and trigger the callback before leaving group Set droppedPartitions = new HashSet<>(subscriptions.assignedPartitions()); if (subscriptions.hasAutoAssignedPartitions() && !droppedPartitions.isEmpty()) { final Exception e; -if (generation() == Generation.NO_GENERATION || rebalanceInProgress()) { +if (currentGeneration.equals(Generation.NO_GENERATION) || rebalanceInProgress()) { Review comment: David, you're right! I was focusing on fixing the `==` error here. Yes, we should be consistent with `onJoinPrepare` [here](https://github.com/apache/kafka/blob/1beb3bd5160c9cc950a541b02684f2fd53ea8da2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L708-L718), to invoke PartitionsLost when ``` generation == Generation.NO_GENERATION.generationId || memberId.equals(Generation.NO_GENERATION.memberId) ``` Otherwise, invoke PartitionsRevoked. I'll update it later. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org