[GitHub] [kafka] showuon commented on pull request #11613: MINOR: Update streamResetter option description

2021-12-17 Thread GitBox


showuon commented on pull request #11613:
URL: https://github.com/apache/kafka/pull/11613#issuecomment-997165583


   @jeqo @mjsax , please help take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11613: MINOR: Update streamResetter option description

2021-12-17 Thread GitBox


showuon opened a new pull request #11613:
URL: https://github.com/apache/kafka/pull/11613


   In 
[KIP-171](https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application),
 We added support to allow users to specify to reset offsets to a specific 
position, not only to the earliest. But the tool description doesn't reflect 
this change. Update it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon closed pull request #11612: MINOR: Update stream resetter option description

2021-12-17 Thread GitBox


showuon closed pull request #11612:
URL: https://github.com/apache/kafka/pull/11612


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11612: MINOR: Update stream resetter option description

2021-12-17 Thread GitBox


showuon opened a new pull request #11612:
URL: https://github.com/apache/kafka/pull/11612


   In 
[KIP-171](https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application),
 We added support to allow users to specify to reset offsets to a specific 
position, not only to the earliest. But the tool description doesn't reflect 
this change. Update it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12399) Deprecate Log4J Appender

2021-12-17 Thread Dongjin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjin Lee updated KAFKA-12399:

Description: As a following job of KAFKA-9366, we have to entirely remove 
the log4j 1.2.7 dependency from the classpath by removing dependencies on 
log4j-appender.  (was: As a following job of KAFKA-9366, we have to provide a 
log4j2 counterpart to log4j-appender.)

> Deprecate Log4J Appender
> 
>
> Key: KAFKA-12399
> URL: https://issues.apache.org/jira/browse/KAFKA-12399
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>  Labels: needs-kip
>
> As a following job of KAFKA-9366, we have to entirely remove the log4j 1.2.7 
> dependency from the classpath by removing dependencies on log4j-appender.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-12399) Deprecate Log4J Appender

2021-12-17 Thread Dongjin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjin Lee updated KAFKA-12399:

Summary: Deprecate Log4J Appender  (was: Add log4j2 Appender)

> Deprecate Log4J Appender
> 
>
> Key: KAFKA-12399
> URL: https://issues.apache.org/jira/browse/KAFKA-12399
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>  Labels: needs-kip
>
> As a following job of KAFKA-9366, we have to provide a log4j2 counterpart to 
> log4j-appender.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11596: MINOR: bump version in kraft readme

2021-12-17 Thread GitBox


showuon commented on pull request #11596:
URL: https://github.com/apache/kafka/pull/11596#issuecomment-997159268


   @cmccabe , thanks for your comment. I've updated the PR. Please check again. 
Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13549) Add "delete interval" config

2021-12-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461738#comment-17461738
 ] 

Matthias J. Sax commented on KAFKA-13549:
-

You could also just do the KIP. Should not be too controversial IMHO. – We just 
need one, because adding a new config is a public API change.

> Add "delete interval" config
> 
>
> Key: KAFKA-13549
> URL: https://issues.apache.org/jira/browse/KAFKA-13549
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Kafka Streams uses "delete record" requests to aggressively purge data from 
> repartition topics. Those request are sent each time we commit.
> For at-least-once with a default commit interval of 30 seconds, this works 
> fine. However, for exactly-once with a default commit interval of 100ms, it's 
> very aggressive. The main issue is broker side, because the broker logs every 
> "delete record" request, and thus broker logs are spammed if EOS is enabled.
> We should consider to add a new config (eg `delete.record.interval.ms` or 
> similar) to have a dedicated config for "delete record" requests, to decouple 
> it from the commit interval config and allow to purge data less aggressively, 
> even if the commit interval is small to avoid the broker side log spamming.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mjsax commented on pull request #11610: KAFKA-13549: Add delete.interval.ms to Streams

2021-12-17 Thread GitBox


mjsax commented on pull request #11610:
URL: https://github.com/apache/kafka/pull/11610#issuecomment-997113605


   Thanks for the PR. Can you do a KIP for this? Happy to review the PR after 
the KIP is approved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13544) Deadlock during shutting down kafka broker because of connectivity problem with zookeeper

2021-12-17 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13544.
-
Fix Version/s: 3.2.0
   Resolution: Fixed

Merged the PR to trunk.

> Deadlock during shutting down kafka broker because of connectivity problem 
> with zookeeper 
> --
>
> Key: KAFKA-13544
> URL: https://issues.apache.org/jira/browse/KAFKA-13544
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.1
>Reporter: Andrei Lakhmanets
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: kafka_broker_logs.log, kafka_broker_stackdump.txt
>
>
> Hi team,
> *Kafka version:* 2.8.1
> *Configuration:* 3 kafka brokers in different availability zones and 3 
> zookeeper brokers in different availability zones.
> I faced with deadlock in kafka. I've attached stack dump of the kafka state 
> to this ticket. The locked threads are "feature-zk-node-event-process-thread" 
> and "kafka-shutdown-hook".
> *Context:*
> My kafka cluster had connectivity problems with zookeeper and in the logs I 
> saw the next exception:
> The stacktrace:
> {code:java}
> [2021-12-06 18:31:14,629] WARN Unable to reconnect to ZooKeeper service, 
> session 0x1039563000f has expired (org.apache.zookeeper.ClientCnxn)
> [2021-12-06 18:31:14,629] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1039563000f has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2021-12-06 18:31:14,629] INFO EventThread shut down for session: 
> 0x1039563000f (org.apache.zookeeper.ClientCnxn)
> [2021-12-06 18:31:14,631] INFO [ZooKeeperClient Kafka server] Session 
> expired. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-06 18:31:14,632] ERROR [feature-zk-node-event-process-thread]: 
> Failed to process feature ZK node change event. The broker will eventually 
> exit. 
> (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
> kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either 
> before or while waiting for connection
>     at 
> kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:279)
>     at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$1(ZooKeeperClient.scala:261)
>     at 
> kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:261)
>     at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1797)
>     at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1767)
>     at 
> kafka.zk.KafkaZkClient.retryRequestUntilConnected(KafkaZkClient.scala:1762)
>     at kafka.zk.KafkaZkClient.getDataAndStat(KafkaZkClient.scala:771)
>     at kafka.zk.KafkaZkClient.getDataAndVersion(KafkaZkClient.scala:755)
>     at 
> kafka.server.FinalizedFeatureChangeListener$FeatureCacheUpdater.updateLatestOrThrow(FinalizedFeatureChangeListener.scala:74)
>     at 
> kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) {code}
> The exception is thrown in feature-zk-node-event-process-thread thread and it 
> is catched in method 
> FinalizedFeatureChangeListener.ChangeNotificationProcessorThread.doWork and 
> then doWork method throws FatalExitError(1).
> The FatalExitError catched in ShutdownableThread.run method and call 
> Exit.exit(e.statusCode()) which calls System.exit under the hood.
> The stackdump of "feature-zk-node-event-process-thread" thread:
> {code:java}
> "feature-zk-node-event-process-thread" #23 prio=5 os_prio=0 cpu=163.19ms 
> elapsed=1563046.32s tid=0x7fd0dcdec800 nid=0x2088 in Object.wait()  
> [0x7fd07e2c1000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(java.base@11.0.11/Native Method)
>     - waiting on 
>     at java.lang.Thread.join(java.base@11.0.11/Thread.java:1300)
>     - waiting to re-lock in wait() <0x88b9d3c8> (a 
> org.apache.kafka.common.utils.KafkaThread)
>     at java.lang.Thread.join(java.base@11.0.11/Thread.java:1375)
>     at 
> java.lang.ApplicationShutdownHooks.runHooks(java.base@11.0.11/ApplicationShutdownHooks.java:107)
>     at 
> java.lang.ApplicationShutdownHooks$1.run(java.base@11.0.11/ApplicationShutdownHooks.java:46)
>     at java.lang.Shutdown.runHooks(java.base@11.0.11/Shutdown.java:130)
>     at java.lang.Shutdown.exit(java.base@11.0.11/Shutdown.java:174)
>     - locked <0x806872f8> (a java.lang.Class for java.lang.Shutdown)
>     at java.lang.Runtime.exit(java.base@11.0.11/Runtime.java:116)
>     at java.lang.System.exit(java.base@11.0.11/System.java:1752)
>     at org.apache.kafka.common.utils.Exit$2.execute(Exit.java:43)
>     at org.

[jira] [Created] (KAFKA-13555) Consider number if input topic partitions for task assignment

2021-12-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-13555:
---

 Summary: Consider number if input topic partitions for task 
assignment
 Key: KAFKA-13555
 URL: https://issues.apache.org/jira/browse/KAFKA-13555
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


StreamsAssignor tries to distribute tasks evenly across all instances/threads 
of a Kafka Streams application. It knows about instances/thread (to give more 
capacity to instances with more thread), and it distinguishes between stateless 
and stateful tasks. We also try to not move state around but to use a sticky 
assignment if possible. However, the assignment does not take the number of 
input topic partitions into account.

For example, an upstream tasks could compute two joins, and thus has 3 input 
partitions, while a downstream task compute a follow up aggregation with a 
single input partitions (from the repartition topic). It could happen that one 
thread gets the 3 input partition tasks assigned, while the other thread get 
the single input partition tasks assigned resulting to an uneven partition 
assignment across both threads.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] junrao merged pull request #11607: KAFKA-13544: fix FinalizedFeatureChangeListener deadlock

2021-12-17 Thread GitBox


junrao merged pull request #11607:
URL: https://github.com/apache/kafka/pull/11607


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771749966



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {

Review comment:
   Sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #11605: MINOR: replace lastOption call in LocalLog#flush() to prevent NoSuchElementException

2021-12-17 Thread GitBox


junrao merged pull request #11605:
URL: https://github.com/apache/kafka/pull/11605


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771749813



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;
+} else {
+final QueryResult result = new QueryResult<>(value);

Review comment:
   I mean, if the upper layers need `QueryResult` with proper type, and 
the inner layers need `QueryResult`, we should just have two properly 
types object, and instead of "swapping", just take the `byte[]` from 
`QueryResult`, deserialize them, and stuff the result into the 
`QueryResult` object.
   
   > but this is the API we agreed on in the KIP.
   
   I did not read the KIP :D (maybe I should have). And we can always adjust 
it. So me it seems useless to have a generic type parameter if we don't obey it 
anyway, and use casts. It's the purpose of generics to avoid casts, and if it 
does not avoid casts, it seems pointless to have).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   > I suppose we could throw an IllegalStateException, since the caller 
probably shouldn't be even trying to "swap" the result on a failed result to 
begin with, but this seems fine, too.
   
   If there is no strict reason not to throw an `IllegalStateException`, I 
would strongly advocate to throw. It not only guards against potential bugs, 
but also expresses the semantics for developers (ie, us) much cleaner and makes 
the code easier to read/reason about.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   > I suppose we could throw an IllegalStateException, since the caller 
probably shouldn't be even trying to "swap" the result on a failed result to 
begin with, but this seems fine, too.
   
   If there is no strict reason not to throw an `IllegalStateException`, I 
would strongly advocate to throw. It not only guards against potential bugs, 
but also expressed the semantics for developers (ie, us) much cleaner and make 
the code easier to read/reason about.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771746009



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -267,17 +281,41 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return result;
 }
 
+
 @SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+   final PositionBound positionBound,
+   final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedKeyQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery =
+KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final Deserializer deserializer = getValueDeserializer();
+final V value = deserializer.deserialize(serdes.topic(), 
rawResult.getResult());
+final QueryResult typedQueryResult =
+rawResult.swapResult(value);
+result = (QueryResult) typedQueryResult;
+} else {
+// the generic type doesn't matter, since failed queries have no 
result set.
+result = (QueryResult) rawResult;
+}
+return result;
+}
+
+@SuppressWarnings({"unchecked", "rawtypes"})
 private Deserializer getValueDeserializer() {
-final Serde vSerde = serdes.valueSerde();
+final Serde valueSerde = serdes.valueSerde();
 final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
 final Deserializer deserializer;
-if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review comment:
   I know it's weird, but it is correct. I would like to revisit it, but I 
think we really need to do that after the current round of queries are 
implemented.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771745563



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -677,18 +638,56 @@ public void shouldHandlePingQuery() {
 }
 }
 
+public  void shouldHandleKeyQuery(
+final Integer key,
+final Function valueExtactor,
+final Integer expectedValue) {
+
+final KeyQuery query = KeyQuery.withKey(key);
+final StateQueryRequest request =
+inStore(STORE_NAME)
+.withQuery(query)
+.withPartitions(mkSet(0, 1))
+.withPositionBound(PositionBound.at(INPUT_POSITION));
+
+final StateQueryResult result =
+IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+final QueryResult queryResult =
+result.getGlobalResult() != null

Review comment:
   This line was written before I scratched global store support from the 
current scope. I'll drop the check from this test for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9234) Consider using @Nullable and @Nonnull annotations

2021-12-17 Thread Lars Bodewig (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461722#comment-17461722
 ] 

Lars Bodewig commented on KAFKA-9234:
-

Is contribution for this issue still welcome? As far as I can see the linked PR 
was closed after months without activity.

> Consider using @Nullable and @Nonnull annotations
> -
>
> Key: KAFKA-9234
> URL: https://issues.apache.org/jira/browse/KAFKA-9234
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, KafkaConnect, producer , 
> streams, streams-test-utils
>Reporter: Matthias J. Sax
>Assignee: Manasvi Gupta
>Priority: Minor
>  Labels: beginner, newbie
>
> Java7 was dropped some time ago, and we might want to consider usein Java8 
> `@Nullable` and `@Nonnull` annotations for all public facing APIs instead of 
> documenting it in JavaDocs only.
> This tickets should be broken down in a series of smaller PRs to keep the 
> scope of each PR contained, allowing for more effective reviews.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


guozhangwang commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771744164



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -162,15 +163,39 @@ public static boolean isPermitted(
 }
 final R result = (R) iterator;
 return QueryResult.forResult(result);
-} catch (final Throwable t) {
-final String message = parseStoreException(t, store, query);
+} catch (final Exception e) {
+final String message = parseStoreException(e, store, query);
 return QueryResult.forFailure(
 FailureReason.STORE_EXCEPTION,
 message
 );
 }
 }
 
+@SuppressWarnings("unchecked")
+private static  QueryResult runKeyQuery(final Query query,
+  final PositionBound 
positionBound,
+  final boolean 
collectExecutionInfo,
+  final StateStore store) {
+if (store instanceof KeyValueStore) {
+final KeyQuery rawKeyQuery = (KeyQuery) query;
+final KeyValueStore keyValueStore =
+(KeyValueStore) store;
+try {
+final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+return (QueryResult) QueryResult.forResult(bytes);

Review comment:
   @vvcephei but in this PR at least, `Should we use swap here as well?`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-12-17 Thread GitBox


guozhangwang merged pull request #11451:
URL: https://github.com/apache/kafka/pull/11451


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-12-17 Thread GitBox


guozhangwang commented on a change in pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#discussion_r771742433



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -705,13 +705,13 @@ protected void onJoinPrepare(int generation, String 
memberId) {
 // so that users can still access the previously owned partitions to 
commit offsets etc.
 Exception exception = null;
 final Set revokedPartitions;
-if (generation == Generation.NO_GENERATION.generationId &&
+if (generation == Generation.NO_GENERATION.generationId ||

Review comment:
   Thanks @dajac , I agree with that as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771742317



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -162,15 +163,39 @@ public static boolean isPermitted(
 }
 final R result = (R) iterator;
 return QueryResult.forResult(result);
-} catch (final Throwable t) {
-final String message = parseStoreException(t, store, query);
+} catch (final Exception e) {
+final String message = parseStoreException(e, store, query);
 return QueryResult.forFailure(
 FailureReason.STORE_EXCEPTION,
 message
 );
 }
 }
 
+@SuppressWarnings("unchecked")
+private static  QueryResult runKeyQuery(final Query query,
+  final PositionBound 
positionBound,
+  final boolean 
collectExecutionInfo,
+  final StateStore store) {
+if (store instanceof KeyValueStore) {
+final KeyQuery rawKeyQuery = (KeyQuery) query;
+final KeyValueStore keyValueStore =
+(KeyValueStore) store;
+try {
+final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+return (QueryResult) QueryResult.forResult(bytes);

Review comment:
   Thanks; let's keep that in mind as we tackle some of the API refactor 
tasks we've queued up. We started with the RawXQuery approach, then dropped it. 
Before we add it back, I think we'd better have a representative set of queries 
and also bear in mind all the other sharp edges we'd like to smooth over before 
release.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771741301



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -253,12 +262,17 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 rawRangeQuery = RangeQuery.withNoBounds();
 }
 final QueryResult> rawResult =
-wrapped().query(rawRangeQuery, positionBound, 
collectExecutionInfo);
+wrapped().query(rawRangeQuery, positionBound, 
collectExecutionInfo);
 if (rawResult.isSuccess()) {
 final KeyValueIterator iterator = 
rawResult.getResult();
 final KeyValueIterator resultIterator = new 
MeteredKeyValueTimestampedIterator(
-iterator, getSensor, getValueDeserializer());
-final QueryResult> typedQueryResult = 
QueryResult.forResult(resultIterator);
+iterator,
+getSensor,
+getValueDeserializer()
+);
+final QueryResult> typedQueryResult = 
rawResult.swapResult(
+resultIterator

Review comment:
   Probably autoformatted because the line was too long.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2021-12-17 Thread John Roesler (Jira)
John Roesler created KAFKA-13554:


 Summary: Rename RangeQuery to KeyRangeQuery
 Key: KAFKA-13554
 URL: https://issues.apache.org/jira/browse/KAFKA-13554
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771741132



##
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##
@@ -34,7 +34,7 @@
  * 
  */
 @Evolving
-public class RangeQuery implements Query> {
+public final class RangeQuery implements Query> {

Review comment:
   https://issues.apache.org/jira/browse/KAFKA-13554




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers

2021-12-17 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461720#comment-17461720
 ] 

Jun Rao commented on KAFKA-13077:
-

[~shivakumar] : So, it seems the issue is unrelated to file corruption. Did you 
just restart ZK, but not Kafka brokers? Typically, rolling restarting ZK 
shouldn't affect the brokers.

> Replication failing after unclean shutdown of ZK and all brokers
> 
>
> Key: KAFKA-13077
> URL: https://issues.apache.org/jira/browse/KAFKA-13077
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Christopher Auston
>Priority: Minor
>
> I am submitting this in the spirit of what can go wrong when an operator 
> violates the constraints Kafka depends on. I don't know if Kafka could or 
> should handle this more gracefully. I decided to file this issue because it 
> was easy to get the problem I'm reporting with Kubernetes StatefulSets (STS). 
> By "easy" I mean that I did not go out of my way to corrupt anything, I just 
> was not careful when restarting ZK and brokers.
> I violated the constraints of keeping Zookeeper stable and at least one 
> running in-sync replica. 
> I am running the bitnami/kafka helm chart on Amazon EKS.
> {quote}% kubectl get po kaf-kafka-0 -ojson |jq .spec.containers'[].image'
> "docker.io/bitnami/kafka:2.8.0-debian-10-r43"
> {quote}
> I started with 3 ZK instances and 3 brokers (both STS). I changed the 
> cpu/memory requests on both STS and kubernetes proceeded to restart ZK and 
> kafka instances at the same time. If I recall correctly there were some 
> crashes and several restarts but eventually all the instances were running 
> again. It's possible all ZK nodes and all brokers were unavailable at various 
> points.
> The problem I noticed was that two of the brokers were just continually 
> spitting out messages like:
> {quote}% kubectl logs kaf-kafka-0 --tail 10
> [2021-07-13 14:26:08,871] INFO [ProducerStateManager 
> partition=__transaction_state-0] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-0/0001.snapshot,1)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,871] WARN [Log partition=__transaction_state-0, 
> dir=/bitnami/kafka/data] *Non-monotonic update of high watermark from 
> (offset=2744 segment=[0:1048644]) to (offset=1 segment=[0:169])* 
> (kafka.log.Log)
> [2021-07-13 14:26:08,874] INFO [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Truncating to offset 2 (kafka.log.Log)
> [2021-07-13 14:26:08,877] INFO [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Loading producer state till offset 2 with message 
> format version 2 (kafka.log.Log)
> [2021-07-13 14:26:08,877] INFO [ProducerStateManager 
> partition=__transaction_state-10] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-10/0002.snapshot,2)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,877] WARN [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Non-monotonic update of high watermark from 
> (offset=2930 segment=[0:1048717]) to (offset=2 segment=[0:338]) 
> (kafka.log.Log)
> [2021-07-13 14:26:08,880] INFO [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Truncating to offset 1 (kafka.log.Log)
> [2021-07-13 14:26:08,882] INFO [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Loading producer state till offset 1 with message 
> format version 2 (kafka.log.Log)
> [2021-07-13 14:26:08,882] INFO [ProducerStateManager 
> partition=__transaction_state-20] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-20/0001.snapshot,1)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,883] WARN [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Non-monotonic update of high watermark from 
> (offset=2956 segment=[0:1048608]) to (offset=1 segment=[0:169]) 
> (kafka.log.Log)
> {quote}
> If I describe that topic I can see that several partitions have a leader of 2 
> and the ISR is just 2 (NOTE I added two more brokers and tried to reassign 
> the topic onto brokers 2,3,4 which you can see below). The new brokers also 
> spit out the messages about "non-monotonic update" just like the original 
> followers. This describe output is from the following day.
> {{% kafka-topics.sh ${=BS} -topic __transaction_state -describe}}
> {{Topic: __transaction_state TopicId: i7bBNCeuQMWl-ZMpzrnMAw PartitionCount: 
> 50 ReplicationFactor: 3 Configs: 
> compression.type=uncompressed,min.insync.replicas=3,cleanup.policy=compact,flush.ms=1000,segment.bytes=104857600,flush.messages=1,max.message.bytes=112,unclean.leader.el

[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771739597



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());
+final Serde vSerde = serdes.valueSerde();
+final Deserializer deserializer;
+if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+final ValueAndTimestampDeserializer 
valueAndTimestampDeserializer =
+(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) 
vSerde).deserializer();
+deserializer = (Deserializer) 
valueAndTimestampDeserializer.valueDeserializer;
+} else {
+deserializer = vSerde.deserializer();
+}

Review comment:
   Sorry, I missed this thread before. I think these points are discussed 
on other threads in this PR, though. Tl;dr: I think we should aim to clean this 
up in https://issues.apache.org/jira/browse/KAFKA-13526
   
   For now, I believe this logic is correct. However, it's good that you 
pointed out we're only testing all _dsl_ store combinations. I filed 
https://issues.apache.org/jira/browse/KAFKA-13553 to extend the IT to also test 
all _papi_ store combinations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13553) Add DSL stores to IQv2StoreIntegrationTest

2021-12-17 Thread John Roesler (Jira)
John Roesler created KAFKA-13553:


 Summary: Add DSL stores to IQv2StoreIntegrationTest
 Key: KAFKA-13553
 URL: https://issues.apache.org/jira/browse/KAFKA-13553
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Right now, we only test stores registered via the DSL. To be truly 
comprehensive, we must also test stores registered via the PAPI.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771738427



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -42,16 +102,21 @@ private StoreQueryUtils() {
 final int partition
 ) {
 
-final QueryResult result;
 final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-if (query instanceof PingQuery) {
-if (!isPermitted(position, positionBound, partition)) {
-result = QueryResult.notUpToBound(position, positionBound, 
partition);
-} else {
-result = (QueryResult) QueryResult.forResult(true);
-}
-} else {
+final QueryResult result;
+
+final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());

Review comment:
   🤔 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11609: KAFKA-12648: fixes for query APIs and blocking calls

2021-12-17 Thread GitBox


guozhangwang commented on a change in pull request #11609:
URL: https://github.com/apache/kafka/pull/11609#discussion_r771737051



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##
@@ -349,8 +351,10 @@ private void rebuildMetadata(final Map> activePart
 final Map> 
namedTopologyToStoreName = new HashMap<>();
 final Set topologyNames = 
topologyMetadata.namedTopologiesView();
 topologyNames.forEach(topologyName -> {
-final Collection storesOnHostForTopologyName = 
getStoresOnHost(storeToSourceTopics, activePartitionHostMap.get(hostInfo), 
topologyName);
-
storesOnHostForTopologyName.addAll(getStoresOnHost(storeToSourceTopics, 
standbyPartitionHostMap.get(hostInfo), topologyName));
+final Map> 
topologyStoresToSourceTopics =

Review comment:
   What's the difference between `topologyStoresToSourceTopics` and 
`storeToSourceTopics` here?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##
@@ -296,9 +297,10 @@ private boolean hasPartitionsForAnyTopics(final 
List topicNames, final S
 }
 
 private Set getStoresOnHost(final Map> 
storeToSourceTopics,
-final Set sourceTopicPartitions, final String 
topologyName) {
+final Set 
sourceTopicPartitions,
+final String topologyName) {
 final InternalTopologyBuilder builder = 
topologyMetadata.lookupBuilderForNamedTopology(topologyName);
-final Set sourceTopicNames = builder.sourceTopicNames();
+final Collection sourceTopicNames = 
builder.sourceTopicCollection();

Review comment:
   Do we want the raw topic names (without the prefix) or the decorated 
ones here? 
   
   BTW The function/variable names are a bit confusing but they stored 
different things. Maybe we should just rename them to be more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771738126



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface
+public interface QueryHandler {
+QueryResult apply(
+final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo,
+final StateStore store
+);
+}
+
+
+@SuppressWarnings("unchecked")
+private static final Map, QueryHandler> QUERY_HANDLER_MAP =

Review comment:
   Not sure if I fully understand, but might be less important.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737792



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface

Review comment:
   I see. So we should add it elsewhere, too (of course not as part of the 
IQ work).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737653



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {

Review comment:
   I'm getting the impression that you're not a huge fan of the phrasing of 
these messages. :) Can we tackle this question in a follow-on fashion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737576



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());

Review comment:
   > Also, because we always wrap the non-timestamped store with the 
KeyValueToTimestampedKeyValueByteStoreAdapter, we also always pass through the 
MeteredTimestampedKeyValue store whether the inner store is really timestamped 
or not.
   
   I don't think so. We only do this in the DSL, but not the PAPI.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771737277



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());
+final Serde vSerde = serdes.valueSerde();
+final Deserializer deserializer;
+if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {

Review comment:
   > The MeteredStore's serde is always a ValueAndTimestamp serde 
regardless of whether the inner store is Timestamped or not. 
   
   Is it? (1) We also have `MeteredTimestampStore` (of course is extends 
`MeteredStore`) but it seems better to split the logic and move everything 
timestamp related into `MeteredTimestampStore`. (2) For PAPI users, they can 
add a plain `KeyValueStore` and we won't wrap it with the `TimestampedStore` 
face and the serdes won't be `ValueAndTimestamp` either.
   
   > What we do is, when you have a non-timestamped store, we wrap it with an 
extra layer 
(org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter)
 that pads the returned values with a fake timestamp 
(org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat
   
   We only do this in the DSL, if the user gives as a non-timestamped store via 
`Materialized` -- but for PAPI users, we never do this but use whatever store 
is given to use as-is.
   
   > so we did not implement the same padding logic for non-timestamped data 
and instead just bubble up to the MeteredStore
   
   Not sure if I can follow? It should not be a concern for IQ? Also, the 
current conversion between plain/timestamped is really just a corner case (and 
a case that we want to deprecate anyway -- we just did not find a way to do so 
-- maybe we should add a runtime check at some point and WARN users if they 
provide a non-timestamped store until we remove support for it and throw an 
exception instead...). Seems not worth to add more tech debt for this behavior 
that we only added to not break stuff.
   
   > Which means that if we want to deserialize it, we need to know whether to 
use the ValueAndTimestamp deserializer or just the Value's deserializer.
   
   Yes, but we should split this logic between the plain `MeteredStore` and the 
`MeteredTimestampStore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771736938



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;
+} else {
+final QueryResult result = new QueryResult<>(value);

Review comment:
   Thanks, @mjsax , I'm not sure precisely what you mean. This does create 
a new object. If you think it would be clearer to add a constructor allowing 
people to set the result along with a pre-populated executionInfo and position 
instead, we could, but this is the API we agreed on in the KIP.
   
   I want this new API to have good ergonomics, so I do want to consider these, 
but I don't think we need to hold up the KeyQuery PR on it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771735849



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;
+} else {
+final QueryResult result = new QueryResult<>(value);

Review comment:
   Thanks, @guozhangwang , I think something like that will be the outcome 
of this follow-on work: https://issues.apache.org/jira/browse/KAFKA-13526
   
   We'll tackle that question before the first release of this new API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771735440



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   sounds good!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771735231



##
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##
@@ -34,7 +34,7 @@
  * 
  */
 @Evolving
-public class RangeQuery implements Query> {
+public final class RangeQuery implements Query> {

Review comment:
   Good point. We can rename it to KeyRangeQuery in a follow-on PR. I'll 
file a Jira.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771734218



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {

Review comment:
   We can do this, but should we better say "passed down", not "handled" in 
the message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733605



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;
+} else {
+final QueryResult result = new QueryResult<>(value);

Review comment:
   Why not use two objects?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771733023



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   > I suppose we could throw an IllegalStateException, since the caller 
probably shouldn't be even trying to "swap" the result on a failed result to 
begin with, but this seems fine, too.
   
   If there is no strict reason not to throw an `IllegalStateException`, I 
would strongly advocate to throw. It's not guards against potential bugs, but 
also expressed the semantics for developers (ie, us) much cleaner and make the 
code easier to read/reason about.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-12-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12495:
--
Component/s: KafkaConnect

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5, BC0] 
> W3 joins with assignment: [BT1, BT2, BT4]
> W4 joins with assignment: [BT4, BT5]

[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r770076908



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   Why do we not allow to swap the result if the current result has a 
failure? And if we don't want to allow swapping, why just return `this` but not 
throw an exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11609: KAFKA-12648: fixes for query APIs and blocking calls

2021-12-17 Thread GitBox


guozhangwang commented on pull request #11609:
URL: https://github.com/apache/kafka/pull/11609#issuecomment-997076409


   @ableegoldman some related tests are failing, I will go ahead and review the 
code still.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


guozhangwang commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771731855



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -267,17 +281,41 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return result;
 }
 
+
 @SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+   final PositionBound positionBound,
+   final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedKeyQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery =
+KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final Deserializer deserializer = getValueDeserializer();
+final V value = deserializer.deserialize(serdes.topic(), 
rawResult.getResult());
+final QueryResult typedQueryResult =
+rawResult.swapResult(value);
+result = (QueryResult) typedQueryResult;
+} else {
+// the generic type doesn't matter, since failed queries have no 
result set.
+result = (QueryResult) rawResult;
+}
+return result;
+}
+
+@SuppressWarnings({"unchecked", "rawtypes"})
 private Deserializer getValueDeserializer() {
-final Serde vSerde = serdes.valueSerde();
+final Serde valueSerde = serdes.valueSerde();
 final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
 final Deserializer deserializer;
-if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review comment:
   This is the part that I'm not completely sure about either... maybe some 
quick sync on this would be more effective?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


guozhangwang commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771726705



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -253,12 +262,17 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 rawRangeQuery = RangeQuery.withNoBounds();
 }
 final QueryResult> rawResult =
-wrapped().query(rawRangeQuery, positionBound, 
collectExecutionInfo);
+wrapped().query(rawRangeQuery, positionBound, 
collectExecutionInfo);
 if (rawResult.isSuccess()) {
 final KeyValueIterator iterator = 
rawResult.getResult();
 final KeyValueIterator resultIterator = new 
MeteredKeyValueTimestampedIterator(
-iterator, getSensor, getValueDeserializer());
-final QueryResult> typedQueryResult = 
QueryResult.forResult(resultIterator);
+iterator,
+getSensor,
+getValueDeserializer()
+);
+final QueryResult> typedQueryResult = 
rawResult.swapResult(
+resultIterator

Review comment:
   nit: why newline with just one parameter?

##
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+/**
+ * Interactive query for retrieving a single record based on its key.
+ */
+@Evolving
+public final class KeyQuery implements Query {
+
+private final K key;
+
+private KeyQuery(final K key) {
+this.key = key;
+}
+
+/**
+ * Creates a query that will retrieve the record identified by {@code key} 
if it exists
+ * (or {@code null} otherwise).
+ * @param key The key to retrieve
+ * @param  The type of the key
+ * @param  The type of the value that will be retrieved
+ */
+public static  KeyQuery withKey(final K key) {
+return new KeyQuery<>(key);

Review comment:
   Should we check the `key` is not null here? Since in later callers e.g. 
`final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));` we do not check if 
`getKey()` is null or not, and `keyBytes` function could throw if it is.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -162,15 +163,39 @@ public static boolean isPermitted(
 }
 final R result = (R) iterator;
 return QueryResult.forResult(result);
-} catch (final Throwable t) {
-final String message = parseStoreException(t, store, query);
+} catch (final Exception e) {
+final String message = parseStoreException(e, store, query);
 return QueryResult.forFailure(
 FailureReason.STORE_EXCEPTION,
 message
 );
 }
 }
 
+@SuppressWarnings("unchecked")
+private static  QueryResult runKeyQuery(final Query query,
+  final PositionBound 
positionBound,
+  final boolean 
collectExecutionInfo,
+  final StateStore store) {
+if (store instanceof KeyValueStore) {
+final KeyQuery rawKeyQuery = (KeyQuery) query;
+final KeyValueStore keyValueStore =
+(KeyValueStore) store;
+try {
+final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
+return (QueryResult) QueryResult.forResult(bytes);

Review comment:
   Should we use `swap` here as well?
   
   Also, I'm feeling maybe we can introduce an internal class extending on 
`KeyQuery` and only define the `swap` in that class (see my other 
comment above).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -162,15 +163,39 @@ public static boolean isPermitted(
 }

[GitHub] [kafka] cmccabe commented on a change in pull request #11596: MINOR: bump version in kraft readme

2021-12-17 Thread GitBox


cmccabe commented on a change in pull request #11596:
URL: https://github.com/apache/kafka/pull/11596#discussion_r771719219



##
File path: config/kraft/README.md
##
@@ -14,8 +14,8 @@ Most important of all, KRaft mode is more scalable.  We 
expect to be able to [su
 # Quickstart
 
 ## Warning
-KRaft mode in Kafka 3.0 is provided for testing only, *NOT* for production.  
We do not yet support upgrading existing ZooKeeper-based Kafka clusters into 
this mode.  In fact, when Kafka 3.1 is released,
-it may not be possible to upgrade your KRaft clusters from 3.0 to 3.1.  There 
may be bugs, including serious ones.  You should *assume that your data could 
be lost at any time* if you try the preview release of KRaft mode.
+KRaft mode in Kafka 3.1 is provided for testing only, *NOT* for production.  
We do not yet support upgrading existing ZooKeeper-based Kafka clusters into 
this mode.  In fact, when Kafka 3.2 is released,

Review comment:
   Let's remove the part about "In fact, when Kafka 3.2 is released, it may 
not be possible to upgrade your KRaft clusters from 3.1 to 3.2." We will 
support this upgrade path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #11596: MINOR: bump version in kraft readme

2021-12-17 Thread GitBox


cmccabe commented on pull request #11596:
URL: https://github.com/apache/kafka/pull/11596#issuecomment-997061036


   Hi @showuon , thanks for the PR! I left one comment. LGTM after that is 
addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

2021-12-17 Thread GitBox


rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r771695520



##
File path: 
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##
@@ -67,6 +68,14 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
*/
   def generateConfigs: Seq[KafkaConfig]
 
+  /**
+   * It is sometimes useful to keep the same log.dirs configuration value; 
override this method if that is desired
+   *
+   * @param priorConfigs the prior configs
+   * @return the new generated configs
+   */
+  def regenerateConfigs(priorConfigs: Seq[KafkaConfig]): Seq[KafkaConfig] = 
generateConfigs
+

Review comment:
   I was able to remove this new `regenerateConfigs()` method and just 
implement some logic in the `ServerShutdownTest` class to save away the prior 
configs and react accordingly.  I'm fine with doing it this way given that 
reconfiguration is rarely used and this solution is pretty easy to implement in 
test classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771687546



##
File path: streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java
##
@@ -34,7 +34,7 @@
  * 
  */
 @Evolving
-public class RangeQuery implements Query> {
+public final class RangeQuery implements Query> {

Review comment:
   I had a review comment to add this to KeyQuery, so I added it to 
RangeQuery for exactly the same reason.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java
##
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.query.Query;
-
-/**
- * A very simple query that all stores can handle to verify that the store is 
participating in the
- * IQv2 framework properly.
- * 
- * This is not a public API and may change without notice.
- */
-public class PingQuery implements Query {

Review comment:
   Removed, since it was only for validating the framework in the absence 
of any query implementations, and now we have query implementations.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -54,22 +55,22 @@
 );
 }
 
-private static final Map, QueryHandler> QUERY_HANDLER_MAP =
+@SuppressWarnings("rawtypes")
+private static final Map QUERY_HANDLER_MAP =
 mkMap(
-mkEntry(
-PingQuery.class,
-(query, positionBound, collectExecutionInfo, store) -> 
QueryResult.forResult(true)
-),

Review comment:
   Removed see the comment on the PingQuery class.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -162,15 +163,39 @@ public static boolean isPermitted(
 }
 final R result = (R) iterator;
 return QueryResult.forResult(result);
-} catch (final Throwable t) {
-final String message = parseStoreException(t, store, query);
+} catch (final Exception e) {

Review comment:
   Changed from Throwable to Exception to avoid swallowing Errors

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -527,10 +541,11 @@ public void verifyStore() {
 private void globalShouldRejectAllQueries() {
 // See KAFKA-13523
 
-final PingQuery query = new PingQuery();
-final StateQueryRequest request = 
inStore(STORE_NAME).withQuery(query);
+final KeyQuery> query = 
KeyQuery.withKey(1);

Review comment:
   Also replaced the PingQuery here. It also doesn't affect the evaluation.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -630,29 +609,11 @@ public void shouldHandlePingQuery() {
 .withQuery(query)
 .withPartitions(mkSet(0, 1))
 .withPositionBound(PositionBound.at(INPUT_POSITION));
-
 final StateQueryResult> result =
 IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
 if (result.getGlobalResult() != null) {
-final QueryResult> queryResult = 
result.getGlobalResult();
-final boolean failure = queryResult.isFailure();
-if (failure) {
-throw new AssertionError(queryResult.toString());
-}
-assertThat(queryResult.isSuccess(), is(true));
-
-assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
-assertThrows(IllegalArgumentException.class,
-queryResult::getFailureMessage);
-
-final KeyValueIterator iterator = 
queryResult.getResult();
-final Set actualValue = new HashSet<>();
-while (iterator.hasNext()) {
-actualValue.add(valueExtactor.apply(iterator.next().value));
-}
-assertThat(actualValue, is(expectedValue));
-assertThat(queryResult.getExecutionInfo(), is(empty()));
+   

[GitHub] [kafka] wcarlson5 commented on pull request #11611: MINOR: prefix topics if internal config is set

2021-12-17 Thread GitBox


wcarlson5 commented on pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#issuecomment-997025039


   @ableegoldman This should make it so we can have topics prefixed with 
whatever is needed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #11611: MINOR: prefix topics if internal config is set

2021-12-17 Thread GitBox


wcarlson5 opened a new pull request #11611:
URL: https://github.com/apache/kafka/pull/11611


   In order to move a topology to another runtime without having to copy over 
the internal topics it would be good to have the option to not prefix the 
internal topics with the application ID. So this change will introduce a new 
config that if set will be the internal topic prefix
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gwenshap closed pull request #11555: MINOR: Correct usage of ConfigException in file and directory config providers

2021-12-17 Thread GitBox


gwenshap closed pull request #11555:
URL: https://github.com/apache/kafka/pull/11555


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] OneCricketeer commented on pull request #7965: KAFKA-9436: New Kafka Connect SMT for plainText => Struct(or Map)

2021-12-17 Thread GitBox


OneCricketeer commented on pull request #7965:
URL: https://github.com/apache/kafka/pull/7965#issuecomment-996944487


   > suggest using grok patterns
   
   I knew I'd seen this somewhere before, but finally found it again
   
   https://github.com/streamthoughts/kafka-connect-transform-grok


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13546) Explicitly specifying default topic creation groups should not make connector fail

2021-12-17 Thread venkat teki (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

venkat teki reassigned KAFKA-13546:
---

Assignee: venkat teki

> Explicitly specifying default topic creation groups should not make connector 
> fail
> --
>
> Key: KAFKA-13546
> URL: https://issues.apache.org/jira/browse/KAFKA-13546
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.6.0, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 2.6.3, 2.7.2, 
> 2.8.1
>Reporter: venkat teki
>Assignee: venkat teki
>Priority: Major
>
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
>  introduced support for Connect worker to allow source connector 
> configurations to define topic creation settings.
> A new source connector configuration {{topic.creation.groups}} was 
> introduced, which takes a list of groups. 
> *Expected behavior*
> According to KIP-158, specifying value "default" in {{topic.creation.groups}} 
> configration should throw a warning, but not let connector fail.
> *Actual behavior*
> Specifying "default" will make a connector fail



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771530728



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -42,16 +102,21 @@ private StoreQueryUtils() {
 final int partition
 ) {
 
-final QueryResult result;
 final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-if (query instanceof PingQuery) {
-if (!isPermitted(position, positionBound, partition)) {
-result = QueryResult.notUpToBound(position, positionBound, 
partition);
-} else {
-result = (QueryResult) QueryResult.forResult(true);
-}
-} else {
+final QueryResult result;
+
+final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());

Review comment:
   Yep, that's accurate, but many of the stores will have the exact same 
logic as each other, so it made sense to consolidate it, which is what this 
util class is for.
   
   The function in the query map just checks the type of the store so that it 
can either cast it to execute the query or return "unknown query". That way, we 
can use the same dispatch map for all queries.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771520958



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface
+public interface QueryHandler {
+QueryResult apply(
+final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo,
+final StateStore store
+);
+}
+
+
+@SuppressWarnings("unchecked")
+private static final Map, QueryHandler> QUERY_HANDLER_MAP =
+mkMap(
+mkEntry(
+PingQuery.class,
+(query, positionBound, collectExecutionInfo, store) -> 
QueryResult.forResult(true)
+),
+mkEntry(KeyQuery.class,
+(query, positionBound, collectExecutionInfo, store) -> {
+if (store instanceof KeyValueStore) {
+final KeyQuery rawKeyQuery = 
(KeyQuery) query;
+final KeyValueStore keyValueStore =
+(KeyValueStore) store;
+try {
+final byte[] bytes = 
keyValueStore.get(rawKeyQuery.getKey());
+return QueryResult.forResult(bytes);
+} catch (final Throwable t) {

Review comment:
   Good point. It's fine to catch Throwables, but it's not fine to swallow 
Errors, as I'm doing here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771517463



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface
+public interface QueryHandler {
+QueryResult apply(
+final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo,
+final StateStore store
+);
+}
+
+
+@SuppressWarnings("unchecked")
+private static final Map, QueryHandler> QUERY_HANDLER_MAP =
+mkMap(
+mkEntry(
+PingQuery.class,

Review comment:
   Actually, the PingQuery isn't in the KIP at all. I added it (as an 
internal API) so that I could verify the stores work properly in the absence of 
any actual queries (because I implemented the framework before any real 
queries, to control the scope of the PR).
   
   Now that we have real queries, I don't think we need to keep Ping around. 
I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771517463



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface
+public interface QueryHandler {
+QueryResult apply(
+final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo,
+final StateStore store
+);
+}
+
+
+@SuppressWarnings("unchecked")
+private static final Map, QueryHandler> QUERY_HANDLER_MAP =
+mkMap(
+mkEntry(
+PingQuery.class,

Review comment:
   Actually, the PingQuery isn't in the KIP at all. I added it so that I 
could verify the stores work properly in the absence of any actual queries 
(because I implemented the framework before any real queries, to control the 
scope of the PR).
   
   Now that we have real queries, I don't think we need to keep Ping around. 
I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771515988



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface
+public interface QueryHandler {
+QueryResult apply(
+final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo,
+final StateStore store
+);
+}
+
+
+@SuppressWarnings("unchecked")
+private static final Map, QueryHandler> QUERY_HANDLER_MAP =

Review comment:
   They both exist to dispatch query execution logic. The MeteredStores' 
logic is to translate results from the inner stores, and the inner stores' 
logic is to execute the query. Since we have a lot of functionally identical 
stores (i.e., many KeyValue stores, etc.), it made sense to consolidate their 
execution logic here instead of duplicating it in every store class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771513674



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -16,18 +16,78 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
 public final class StoreQueryUtils {
 
+/**
+ * a utility interface to facilitate stores' query dispatch logic,
+ * allowing them to generically store query execution logic as the values
+ * in a map.
+ */
+@FunctionalInterface

Review comment:
   The compiler is smart enough. It's just an informative annotation. Its 
only practical purpose is to raise compilation error if you try to declare more 
than one method in it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771512115



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());

Review comment:
   I'd forgotten about MeteredTimestampedKeyValueStore, but now that I'm 
looking at it, what it does is extend the MeteredKeyValueStore, apparently 
specifically to pad the value serde with a ValueAndTimestamp serde. Otherwise, 
all the logic lives in MeteredKeyValueStore.
   
   Also, because we always wrap the non-timestamped store with the 
`KeyValueToTimestampedKeyValueByteStoreAdapter`, we also always pass through 
the MeteredTimestampedKeyValue store whether the inner store is really 
timestamped or not.
   
   I think we could clean this whole hierarchy up a bit, but it's not necessary 
as part of this work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771505654



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 
WrappedStateStore.isTimestamped(wrapped());
+final Serde vSerde = serdes.valueSerde();
+final Deserializer deserializer;
+if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {

Review comment:
   Yes, this is super weird, and I think that 
https://issues.apache.org/jira/browse/KAFKA-13526 will give us a more elegant 
way to correct it, but as it stands right now, this is necessary.
   
   The MeteredStore's serde is always a ValueAndTimestamp serde regardless of 
whether the inner store is Timestamped or not. This works because the normal 
execution flow actually converts the byte results from non-timestamped stores 
into the binary schema of a ValueAndTimestamp.
   
   What we do is, when you have a non-timestamped store, we wrap it with an 
extra layer 
(`org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter`)
 that pads the returned values with a fake timestamp 
(`org.apache.kafka.streams.state.TimestampedBytesStore#convertToTimestampedFormat`).
 That makes sense when the store is used by processors (particularly the ones 
in the DSL) because it makes the store configuration orthogonal to the 
processor logic, but for IQ, it's just spending extra time and memory for no 
productive purpose.
   
   One of the primary design goals of IQv2 is to make query execution as lean 
as possible, so we did not implement the same padding logic for non-timestamped 
data and instead just bubble up to the MeteredStore the actual byte array 
returned from the BytesStore. Which means that if we want to deserialize it, we 
need to know whether to use the ValueAndTimestamp deserializer or just the 
Value's deserializer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13549) Add "delete interval" config

2021-12-17 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461528#comment-17461528
 ] 

Nicholas Telford commented on KAFKA-13549:
--

I drafted a patch for this and only noticed the "needs-kip" label after I 
submitted the PR. Hopefully it at least serves as a starting point :D

> Add "delete interval" config
> 
>
> Key: KAFKA-13549
> URL: https://issues.apache.org/jira/browse/KAFKA-13549
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Kafka Streams uses "delete record" requests to aggressively purge data from 
> repartition topics. Those request are sent each time we commit.
> For at-least-once with a default commit interval of 30 seconds, this works 
> fine. However, for exactly-once with a default commit interval of 100ms, it's 
> very aggressive. The main issue is broker side, because the broker logs every 
> "delete record" request, and thus broker logs are spammed if EOS is enabled.
> We should consider to add a new config (eg `delete.record.interval.ms` or 
> similar) to have a dedicated config for "delete record" requests, to decouple 
> it from the commit interval config and allow to purge data less aggressively, 
> even if the commit interval is small to avoid the broker side log spamming.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771495764



##
File path: streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+@Evolving
+public class KeyQuery implements Query {

Review comment:
   Good idea. I'm not sure whether it will be ultimately be good to extend 
queries with other queries later, but it doesn't hurt to add this now so that 
we can make an explicit decision about it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771494393



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{

Review comment:
   Sorry about that; oversight.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771493968



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");

Review comment:
   Thanks; this seems about the same, and it would apply to all the other 
execution info messages we've got, so I think I'll keep it the same for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771492764



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {

Review comment:
   Yeah, the idea was to actually be able to see everything that happened 
during query execution, specifically to demystify what's going on when you're 
debugging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771490884



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();

Review comment:
   Not a bad idea!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nicktelford commented on pull request #11610: KAFKA-13549: Add delete.interval.ms to Streams

2021-12-17 Thread GitBox


nicktelford commented on pull request #11610:
URL: https://github.com/apache/kafka/pull/11610#issuecomment-996810481


   @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nicktelford opened a new pull request #11610: KAFKA-13549: Add delete.interval.ms to Streams

2021-12-17 Thread GitBox


nicktelford opened a new pull request #11610:
URL: https://github.com/apache/kafka/pull/11610


   Records are explicitly deleted once they have been fully consumed.
   Currently, this is done every time the Task is committed, resulting in
   "delete records" requests being sent every `commit.interval.ms`
   milliseconds.
   
   When `commit.interval.ms` is set very low, for example when
   `processing.guarantee` is set to `exactly_once_v2`, this causes delete
   records requests to be sent extremely frequently, potentially reducing
   throughput and causing a high volume of log messages to be logged by the
   brokers.
   
   Disconnecting delete records requests from the commit interval resolves
   this problem.
   
   We now only explicitly delete records for a repartition topic when we
   commit, if it's been at least `delete.interval.ms` milliseconds since
   the last time we deleted records.
   
   Because we still require a commit to trigger record deletion, the
   lower-bound of `delete.interval.ms` is effectively capped at the
   `commit.interval.ms`.
   
   For compatibility, the default `delete.interval.ms` is set to 30
   seconds, the same as the default `commit.interval.ms`. Users who have
   configured a different `commit.interval.ms` may need to review and
   change `delete.interval.ms`.
   
   Unlike `commit.interval.ms`, we don't dynamically change the default for
   `delete.interval.ms` when EOS processing is enabled, as it's important
   not to flood brokers with the record deletions, and we want a sensible
   default.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   This code is my own work and is licensed to the Apache Kafka project under 
the terms of the same license (ASL 2) as the project itself.
   
   ### Committer Checklist (excluded from commit message)
   - [X] Verify design and implementation 
   - [X] Verify test coverage and CI build status
   - [X] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771472556



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;
+} else {
+final QueryResult result = new QueryResult<>(value);

Review comment:
   What's happening here is that we're turning a `QueryResult` into a 
`QueryResult`. A concrete example (in fact the only use case) of this is in 
the MeteredStore, we get back a raw result from the BytesStore and need to 
deserialize it, so we need to convert the `QueryResult` into a 
`QueryResult` or something.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771470456



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   In the case of a failure, there is no result, just the failure message. 
I wanted to maintain an invariant that there is always either a failure or a 
result, but not both or neither. I also didn't think it would be right to allow 
accidentally converting a failure to a successful result via this method.
   
   I suppose we could throw an IllegalStateException, since the caller probably 
shouldn't be even trying to "swap" the result on a failed result to begin with, 
but this seems fine, too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771466959



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -29,10 +29,10 @@
  */
 public final class QueryResult {
 
-private final List executionInfo = new LinkedList<>();
 private final FailureReason failureReason;
 private final String failure;
 private final R result;
+private List executionInfo = new LinkedList<>();

Review comment:
   Thanks; that would be another way to do it. I'm not sure if that would 
be clearly better or not, though.
   
   It's for getting more details about how the query was actually executed 
inside of Streams. Right now, if you request it as part of the query, each 
store layer will report what it did and how long it took. For runtime queries, 
you wouldn't want to use it, but I wanted to enable debugging if the cases 
where query execution seems like it's taking longer than expected. Also, it 
could be used for tracing, in which every Nth query is run with the execution 
info on.
   
   It's a list of Strings so that each store layer / operation can just add one 
"line" of info (like a stack trace), but we don't waste time and memory 
actually concatenating them with newlines. We considered adding more structure 
(such as having a field for execution time), but kept it as a string so as not 
to restrict the kind of "execution information" we might find useful to add in 
the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771459374



##
File path: 
streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
##
@@ -52,5 +52,12 @@
  * The requested store partition does not exist at all. For example, 
partition 4 was requested,
  * but the store in question only has 4 partitions (0 through 3).
  */
-DOES_NOT_EXIST;
+DOES_NOT_EXIST,
+
+/**
+ * The store that handled the query got an exception during query 
execution. The message
+ * will contain the exception details. Depending on the nature of the 
exception, the caller
+ * may be able to retry this instance or may need to try a different 
instance.
+ */
+STORE_EXCEPTION;

Review comment:
   Thanks! @guozhangwang reminded me during the discussion to make sure 
that all the cases in that KIP were accounted for. Some are still exceptions, 
and some are now FailureReasons: 
https://lists.apache.org/thread/brvwvpvsbsfvqpqg6jvry5hqny0vm2tr




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-17 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r771454005



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {

Review comment:
   The purpose of this method is to allow `MeteredKeyValue` store to 
deserialize the result without wiping out the execution info or position that 
it got back from the bytes store. I missed that while reviewing your PR, so I 
went ahead and added a fix for it to this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13502) Support configuring BROKER_LOGGER on controller-only KRaft nodes

2021-12-17 Thread Ron Dagostino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461478#comment-17461478
 ] 

Ron Dagostino commented on KAFKA-13502:
---

This is one aspect of the broader problem as described in 
https://issues.apache.org/jira/browse/KAFKA-13552

> Support configuring BROKER_LOGGER on controller-only KRaft nodes
> 
>
> Key: KAFKA-13502
> URL: https://issues.apache.org/jira/browse/KAFKA-13502
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13552) Unable to dynamically change broker log levels on KRaft

2021-12-17 Thread Ron Dagostino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461476#comment-17461476
 ] 

Ron Dagostino commented on KAFKA-13552:
---

[~dengziming] Thanks for pointing that out.  Although there is not much in that 
ticket, it appears to address controller-only nodes, whereas this ticket 
indicates that no KRaft node (broker-only, controller-only, or combined 
broker+controller) supports dynamic changes to the log levels .  I updated the 
description of this ticket to point to that one since it is just one aspect of 
the problem.

> Unable to dynamically change broker log levels on KRaft
> ---
>
> Key: KAFKA-13552
> URL: https://issues.apache.org/jira/browse/KAFKA-13552
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ron Dagostino
>Priority: Major
>
> It is currently not possible to dynamically change the log level in KRaft.  
> For example:
> kafka-configs.sh --bootstrap-server  --alter --add-config 
> "kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers 
> --entity-name 0
> Results in:
> org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource 
> type BROKER_LOGGER.
> The code to process this request is in ZkAdminManager.alterLogLevelConfigs(). 
>  This needs to be moved out of there, and the functionality has to be 
> processed locally on the broker instead of being forwarded to the KRaft 
> controller.
> It is also an open question as to how we can dynamically alter log levels for 
> a remote KRaft controller.  Connecting directly to it is one possible 
> solution, but that may not be desirable since generally connecting directly 
> to the controller is not necessary.  The ticket for this particular spect of 
> the issue is https://issues.apache.org/jira/browse/KAFKA-13502



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13552) Unable to dynamically change broker log levels on KRaft

2021-12-17 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-13552:
--
Description: 
It is currently not possible to dynamically change the log level in KRaft.  For 
example:

kafka-configs.sh --bootstrap-server  --alter --add-config 
"kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers --entity-name 0

Results in:

org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource 
type BROKER_LOGGER.

The code to process this request is in ZkAdminManager.alterLogLevelConfigs().  
This needs to be moved out of there, and the functionality has to be processed 
locally on the broker instead of being forwarded to the KRaft controller.

It is also an open question as to how we can dynamically alter log levels for a 
remote KRaft controller.  Connecting directly to it is one possible solution, 
but that may not be desirable since generally connecting directly to the 
controller is not necessary.  The ticket for this particular spect of the issue 
is https://issues.apache.org/jira/browse/KAFKA-13502

  was:
It is currently not possible to dynamically change the log level in KRaft.  For 
example:

kafka-configs.sh --bootstrap-server  --alter --add-config 
"kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers --entity-name 0

Results in:

org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource 
type BROKER_LOGGER.

The code to process this request is in ZkAdminManager.alterLogLevelConfigs().  
This needs to be moved out of there, and the functionality has to be processed 
locally on the broker instead of being forwarded to the KRaft controller.

It is also an open question as to how we can dynamically alter log levels for a 
remote KRaft controller.  Connecting directly to it is one possible solution, 
but that may not be desirable since generally connecting directly to the 
controller is not necessary.


> Unable to dynamically change broker log levels on KRaft
> ---
>
> Key: KAFKA-13552
> URL: https://issues.apache.org/jira/browse/KAFKA-13552
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ron Dagostino
>Priority: Major
>
> It is currently not possible to dynamically change the log level in KRaft.  
> For example:
> kafka-configs.sh --bootstrap-server  --alter --add-config 
> "kafka.server.ReplicaManager=DEBUG" --entity-type broker-loggers 
> --entity-name 0
> Results in:
> org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource 
> type BROKER_LOGGER.
> The code to process this request is in ZkAdminManager.alterLogLevelConfigs(). 
>  This needs to be moved out of there, and the functionality has to be 
> processed locally on the broker instead of being forwarded to the KRaft 
> controller.
> It is also an open question as to how we can dynamically alter log levels for 
> a remote KRaft controller.  Connecting directly to it is one possible 
> solution, but that may not be desirable since generally connecting directly 
> to the controller is not necessary.  The ticket for this particular spect of 
> the issue is https://issues.apache.org/jira/browse/KAFKA-13502



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13547) Kafka - 1.0.0 | Remove log4j.jar

2021-12-17 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461472#comment-17461472
 ] 

Dongjin Lee commented on KAFKA-13547:
-

[~masood31] I am currently working on a preview version based on AK 2.8.1 and 
3.0.0, and I will complete it in this weekend. It will replace all log4j 1.x 
dependency into 2.x, with backward compatibility of logger configuration.

+1. I can't guarantee when it will be merged into the official release.

> Kafka - 1.0.0 | Remove log4j.jar
> 
>
> Key: KAFKA-13547
> URL: https://issues.apache.org/jira/browse/KAFKA-13547
> Project: Kafka
>  Issue Type: Bug
>Reporter: masood
>Priority: Blocker
>
> We wanted to remove the log4j.jar but ended up with a dependency on the 
> kafka.producer.ProducerConfig.
> Caused by: java.lang.NoClassDefFoundError: org/apache/log4j/Logger
>     at kafka.utils.Logging.logger(Logging.scala:24)
>     at kafka.utils.Logging.logger$(Logging.scala:24)
>     at 
> kafka.utils.VerifiableProperties.logger$lzycompute(VerifiableProperties.scala:27)
>     at kafka.utils.VerifiableProperties.logger(VerifiableProperties.scala:27)
>     at kafka.utils.Logging.info(Logging.scala:71)
>     at kafka.utils.Logging.info$(Logging.scala:70)
>     at kafka.utils.VerifiableProperties.info(VerifiableProperties.scala:27)
>     at kafka.utils.VerifiableProperties.verify(VerifiableProperties.scala:218)
>     at kafka.producer.ProducerConfig.(ProducerConfig.scala:61)
> Is there any configuration available which can resolve this error.
> Please note we are not using log4j.properties or any other log4j logging 
> mechanism for Kafka connection in the application.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-17 Thread GitBox


showuon commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r771395371



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 final RuntimeException exception = future.exception();
 
 resetJoinGroupFuture();
+rejoinReason = "rebalance failed due to " + 
exception.getClass() + " error: " + exception.getMessage();

Review comment:
   +1 for `getSimpleName` for the class. In addition to the David's 
suggestion, I think we should also remove the 2nd `due to`, because there is 
already 1 `due to` in the sentence. ex:  `rebalance failed: '$message' ($class)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-17 Thread GitBox


showuon commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r771384033



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1249,7 +1260,7 @@ class GroupCoordinator(val brokerId: Int,
 // for new members. If the new member is still there, we expect it to 
retry.
 completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
 
-maybePrepareRebalance(group, s"Adding new member $memberId with group 
instance id $groupInstanceId")
+maybePrepareRebalance(group, s"Adding new member $memberId with group 
instance id $groupInstanceId. Member joined due to $reason")

Review comment:
   +1 for `; client reason: $reason`

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 final RuntimeException exception = future.exception();
 
 resetJoinGroupFuture();
+rejoinReason = "rebalance failed due to " + 
exception.getClass() + " error: " + exception.getMessage();

Review comment:
   +1 for `getSimpleName` for the class. In addition to the David's 
suggestion, I think we should also remove the `due to`, because there is 
already 1 `due to` in the sentence. ex:  `rebalance failed: '$message' ($class)`

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -486,6 +487,43 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() {
 ensureActiveGroup(rejoinedGeneration, memberId);
 }
 
+@Test
+public void testRejoinReason() {
+setupCoordinator();
+
+String memberId = "memberId";
+int generation = 5;
+
+// test initial reason
+mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+expectJoinGroup("", "initialized abstract coordinator", generation, 
memberId);
+
+// successful sync group response should reset reason
+expectSyncGroup(generation, memberId);
+ensureActiveGroup(generation, memberId);
+assertEquals("", coordinator.rejoinReason());
+
+// Force a rebalance
+expectJoinGroup(memberId, "Manual test trigger", generation, memberId);
+expectSyncGroup(generation, memberId);
+coordinator.requestRejoin("Manual test trigger");
+ensureActiveGroup(generation, memberId);
+assertEquals("", coordinator.rejoinReason());
+
+// max group size reached
+
mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, 
memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED));
+coordinator.requestRejoin("Manual test trigger 2");
+try {
+coordinator.joinGroupIfNeeded(mockTime.timer(100L));
+} catch (GroupMaxSizeReachedException e) {

Review comment:
   Actually, you can achieve what you want by `assertThrows` as below:
   
   ```java
   Throwable e = assertThrows(GroupMaxSizeReachedException.class,
   () -> coordinator.joinGroupIfNeeded(mockTime.timer(100L)));
   
   // next join group request should contain exception message
   expectJoinGroup(memberId, e.getMessage(), generation, memberId);
   expectSyncGroup(generation, memberId);
   ensureActiveGroup(generation, memberId);
   assertEquals("", coordinator.rejoinReason());
   ```
   
   Basically, the `assertThrows` is doing the similar thing as what you did 
here (try/catch). It's recommended to use the `assertThrows` to understand the 
exception is also a verification position in this test. Otherwise, let's say, 
if someday, someone breaks the logic and makes the 
`coordinator.joinGroupIfNeeded(mockTime.timer(100L));` works without exception 
thrown, your current test can't catch this error, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest

2021-12-17 Thread GitBox


dajac commented on a change in pull request #11571:
URL: https://github.com/apache/kafka/pull/11571#discussion_r771376907



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -617,9 +617,10 @@ class GroupCoordinator(val brokerId: Int,
leavingMembers: List[MemberIdentity],
responseCallback: LeaveGroupResult => Unit): Unit = {
 
-def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String): 
Unit = {
+def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String, 
reason: Option[String]): Unit = {
   val member = group.get(memberId)
-  removeMemberAndUpdateGroup(group, member, s"Removing member $memberId on 
LeaveGroup")
+  val leaveReason = reason.getOrElse("unknown reason")
+  removeMemberAndUpdateGroup(group, member, s"Removing member $memberId on 
LeaveGroup due to: $leaveReason")

Review comment:
   I think that we should use the same pattern for both join reasons and 
leave reason. See 
https://github.com/apache/kafka/pull/11566#discussion_r771371068.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-17 Thread GitBox


dajac commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r771371068



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1249,7 +1260,7 @@ class GroupCoordinator(val brokerId: Int,
 // for new members. If the new member is still there, we expect it to 
retry.
 completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
 
-maybePrepareRebalance(group, s"Adding new member $memberId with group 
instance id $groupInstanceId")
+maybePrepareRebalance(group, s"Adding new member $memberId with group 
instance id $groupInstanceId. Member joined due to $reason")

Review comment:
   The output log is quite hard to follow at the moment. Example:
   
   ```
   [2021-12-17 11:29:16,061] INFO [GroupCoordinator 0]: Preparing to rebalance 
group test in state PreparingRebalance with old generation 1 
(__consumer_offsets-48) (reason: Adding new member 
console-consumer-1d5a9905-c271-4700-a817-62fc9b9f28fc with group instance id 
None. Member joined due to rebalance failed due to class 
org.apache.kafka.common.errors.MemberIdRequiredException error: The group 
member needs to have a valid member id before actually entering a consumer 
group.) (kafka.coordinator.group.GroupCoordinator)
   ```
   
   How about doing the following? For each reason, we could add `; client 
reason: $reason`. With this, we will always have (reason: ; client reason: 
...) in each rebalance logs. It might be clearer. What do you think?

##
File path: clients/src/main/resources/common/message/JoinGroupResponse.json
##
@@ -31,7 +31,9 @@
   // Version 6 is the first flexible version.
   //
   // Starting from version 7, the broker sends back the Protocol Type to the 
client (KIP-559).
-  "validVersions": "0-7",
+  //
+  // Version 8 adds the Reason field (KIP-800).

Review comment:
   nit: Should we rather say Version 8 is the same as version 7. here?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 final RuntimeException exception = future.exception();
 
 resetJoinGroupFuture();
+rejoinReason = "rebalance failed due to " + 
exception.getClass() + " error: " + exception.getMessage();

Review comment:
   Example on the broker side:
   ```
   [2021-12-17 11:29:16,061] INFO [GroupCoordinator 0]: Preparing to rebalance 
group test in state PreparingRebalance with old generation 1 
(__consumer_offsets-48) (reason: Adding new member 
console-consumer-1d5a9905-c271-4700-a817-62fc9b9f28fc with group instance id 
None. Member joined due to rebalance failed due to class 
org.apache.kafka.common.errors.MemberIdRequiredException error: The group 
member needs to have a valid member id before actually entering a consumer 
group.) (kafka.coordinator.group.GroupCoordinator)
   ```
   
   * Should we only get the `getSimpleName` of the class?
   * There are many `:` in the log. I wonder if we could remove the one we've 
put here. Perhaps, we could use the following pattern: `rebalance failed due to 
'$message' ($class)`. What do you think?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -181,6 +182,7 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
 case Some(group) =>
   group.inLock {
+val joinReason = reason.getOrElse("unknown reason")

Review comment:
   If we do this, it might be better to not use an `Option` after all. We 
could simply provided the default reason to `handleJoinGroup` if none is 
provided. Also, how about using `not provided` instead of `unknown reason`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest

2021-12-17 Thread GitBox


dajac commented on a change in pull request #11571:
URL: https://github.com/apache/kafka/pull/11571#discussion_r771248687



##
File path: clients/src/main/resources/common/message/LeaveGroupResponse.json
##
@@ -24,7 +24,9 @@
   // Starting in version 3, we will make leave group request into batch mode 
and add group.instance.id.
   //
   // Version 4 is the first flexible version.
-  "validVersions": "0-4",
+  //
+  // Version 5 adds the Reason field (KIP-800).

Review comment:
   nit: Should we rather say `Version 5 is the same as version 4.` here? 

##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -1837,7 +1837,12 @@ private DescribeGroupsResponse 
createDescribeGroupResponse() {
 }
 
 private LeaveGroupRequest createLeaveGroupRequest(short version) {
-return new LeaveGroupRequest.Builder("group1", singletonList(new 
MemberIdentity().setMemberId("consumer1")))
+MemberIdentity member = new MemberIdentity()
+.setMemberId("consumer1");

Review comment:
   nit: I think that we can keep this one on the previous line.

##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -1837,7 +1837,12 @@ private DescribeGroupsResponse 
createDescribeGroupResponse() {
 }
 
 private LeaveGroupRequest createLeaveGroupRequest(short version) {
-return new LeaveGroupRequest.Builder("group1", singletonList(new 
MemberIdentity().setMemberId("consumer1")))
+MemberIdentity member = new MemberIdentity()
+.setMemberId("consumer1");
+if (version >= 5) {
+member.setMemberId("reason: test");

Review comment:
   This is not correct.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -617,9 +617,10 @@ class GroupCoordinator(val brokerId: Int,
leavingMembers: List[MemberIdentity],
responseCallback: LeaveGroupResult => Unit): Unit = {
 
-def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String): 
Unit = {
+def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String, 
reason: Option[String]): Unit = {
   val member = group.get(memberId)
-  removeMemberAndUpdateGroup(group, member, s"Removing member $memberId on 
LeaveGroup")
+  val leaveReason = reason.getOrElse("unknown reason")
+  removeMemberAndUpdateGroup(group, member, s"Removing member $memberId on 
LeaveGroup due to: $leaveReason")
   removeHeartbeatForLeavingMember(group, member.memberId)
   info(s"Member $member has left group $groupId through explicit 
`LeaveGroup` request")

Review comment:
   I am also tempted to add the reason here. What do you think?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3907,6 +3909,42 @@ public void testRemoveMembersFromGroup() throws 
Exception {
 }
 }
 
+@Test
+public void testRemoveMembersFromGroupReason() throws Exception {
+final Cluster cluster = mockCluster(3, 0);
+final Time time = new MockTime();
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster)) {
+
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+env.kafkaClient().prepareResponse(body -> {
+if (!(body instanceof LeaveGroupRequest)) {
+return false;
+}
+LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) 
body).data();
+
+return leaveGroupRequest.members().stream().allMatch(member -> 
member.reason().equals("testing remove members reason"));
+}, new LeaveGroupResponse(new 
LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers(
+Arrays.asList(
+new 
MemberResponse().setGroupInstanceId("instance-1"),
+new 
MemberResponse().setGroupInstanceId("instance-2")
+))

Review comment:
   nit: Indentation of those lines seems to be off here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

2021-12-17 Thread GitBox


cadonna commented on pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#issuecomment-996549823


   @ableegoldman There are checkstyle errors in the builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #11560: KAFKA-7589: Allow configuring network threads per listener

2021-12-17 Thread GitBox


tombentley commented on a change in pull request #11560:
URL: https://github.com/apache/kafka/pull/11560#discussion_r771207226



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -920,7 +919,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
   def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): 
Map[String, AnyRef] = {
 kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case 
(key, _) =>
   // skip the reconfigurable configs
-  !DynamicSecurityConfigs.contains(key) && 
!SocketServer.ListenerReconfigurableConfigs.contains(key)
+  !DynamicSecurityConfigs.contains(key) && 
!SocketServer.ListenerReconfigurableConfigs.contains(key) && 
!DataPlaneAcceptor.ListenerReconfigurableConfigs.contains(key)

Review comment:
   Oh, of course it's single threaded! Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] richard-axual commented on a change in pull request #11535: KAFKA-13476: Increase resilience timestamp decoding Kafka Streams

2021-12-17 Thread GitBox


richard-axual commented on a change in pull request #11535:
URL: https://github.com/apache/kafka/pull/11535#discussion_r771190189



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -1097,15 +1097,20 @@ long decodeTimestamp(final String encryptedString) {
 if (encryptedString.isEmpty()) {
 return RecordQueue.UNKNOWN;
 }
-final ByteBuffer buffer = 
ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
-final byte version = buffer.get();
-switch (version) {
-case LATEST_MAGIC_BYTE:
-return buffer.getLong();
-default:
-log.warn("Unsupported offset metadata version found. Supported 
version {}. Found version {}.",
- LATEST_MAGIC_BYTE, version);
-return RecordQueue.UNKNOWN;
+try {
+final ByteBuffer buffer = 
ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
+final byte version = buffer.get();
+switch (version) {
+case LATEST_MAGIC_BYTE:
+return buffer.getLong();
+default:
+log.warn("Unsupported offset metadata version found. 
Supported version {}. Found version {}.",
+LATEST_MAGIC_BYTE, version);
+return RecordQueue.UNKNOWN;
+}
+} catch (final IllegalArgumentException argumentException) {
+log.warn("Unsupported offset metadata found {}", encryptedString);

Review comment:
   Very good point about logging the encryptedString, I've removed it from 
the log statement




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-12-17 Thread GitBox


showuon commented on a change in pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#discussion_r771181554



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -705,13 +705,13 @@ protected void onJoinPrepare(int generation, String 
memberId) {
 // so that users can still access the previously owned partitions to 
commit offsets etc.
 Exception exception = null;
 final Set revokedPartitions;
-if (generation == Generation.NO_GENERATION.generationId &&
+if (generation == Generation.NO_GENERATION.generationId ||

Review comment:
   Yes, I agree that it doesn't hurt to have it. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-12-17 Thread GitBox


showuon commented on a change in pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#discussion_r771181062



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -755,18 +755,17 @@ protected void onJoinPrepare(int generation, String 
memberId) {
 
 @Override
 public void onLeavePrepare() {
-// Save the current Generation and use that to get the memberId, as 
the hb thread can change it at any time
+// Save the current Generation, as the hb thread can change it at any 
time
 final Generation currentGeneration = generation();
-final String memberId = currentGeneration.memberId;
 
-log.debug("Executing onLeavePrepare with generation {} and memberId 
{}", currentGeneration, memberId);
+log.debug("Executing onLeavePrepare with generation {}", 
currentGeneration);
 
 // we should reset assignment and trigger the callback before leaving 
group
 Set droppedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
 
 if (subscriptions.hasAutoAssignedPartitions() && 
!droppedPartitions.isEmpty()) {
 final Exception e;
-if (generation() == Generation.NO_GENERATION || 
rebalanceInProgress()) {
+if (currentGeneration.equals(Generation.NO_GENERATION) || 
rebalanceInProgress()) {

Review comment:
   David, you're right! I was focusing on fixing the `==` error here. Yes, 
we should be consistent with `onJoinPrepare` 
[here](https://github.com/apache/kafka/blob/1beb3bd5160c9cc950a541b02684f2fd53ea8da2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L708-L718),
 to invoke PartitionsLost when 
   ```
   generation == Generation.NO_GENERATION.generationId ||
   memberId.equals(Generation.NO_GENERATION.memberId)
   ```
   Otherwise, invoke PartitionsRevoked. 
   I'll update it later. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org