[jira] [Updated] (KAFKA-14000) Kafka-connect standby server shows empty tasks list

2022-06-15 Thread Xinyu Zou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Zou updated KAFKA-14000:
--
Description: 
I'm using Kafka-connect distributed mode. There're two servers. One active and 
one standby. The standby server sometimes shows empty tasks list in status rest 
API response.

curl host:8443/connectors/name1/status
{code:java}
{
    "connector": {
        "state": "RUNNING",
        "worker_id": "1.2.3.4:10443"
    },
    "name": "name1",
    "tasks": [],
    "type": "source"
} {code}
I enabled TRACE log and checked. As required, the connect-status topic is set 
to cleanup.policy=compact. But messages in the topic won't be compacted timely. 
They will be compacted in a specific interval. So usually there're more than 
one messages with same key. E.g. When kafka-connect is launched there's no 
connector running. And then we start a new connector. Then there will be two 
messages in connect-status topic:

status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', 
generation=100

status-task-name1 : __

Please check the log file [^kafka-connect-trace.log]. We can see that the tasks 
status was removed finally. But actually the empty status was not the newest 
message in topic connect-status.

 

When reading status from connect-status topic, it doesn't sort messages by 
generation.

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java]

So I think this could be improved. We can either sort the messages after poll 
or compare generation value before we choose correct status message.

  was:
I'm using Kafka-connect distributed mode. There're two servers. One active and 
one standby. The standby server sometimes shows empty tasks list in status rest 
API response.

curl host:8443/connectors/name1/status
{code:java}
{
    "connector": {
        "state": "RUNNING",
        "worker_id": "1.2.3.4:10443"
    },
    "name": "name1",
    "tasks": [],
    "type": "source"
} {code}
I enabled TRACE log and checked. As required, the connect-status topic is set 
to cleanup.policy=compact. But messages in the topic won't be compacted timely. 
They will be compacted in a specific interval. So usually there're more than 
one messages with same key. E.g. When kafka-connect is launched there's no 
connector running. And then we start a new connector. Then there will be two 
messages in connect-status topic:

status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', 
generation=100

status-task-name1 : __

 

When reading status from connect-status topic, it doesn't sort messages by 
generation.

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java]

So I think this could be improved. We can either sort the messages after poll 
or compare generation value before we choose correct status message.


> Kafka-connect standby server shows empty tasks list
> ---
>
> Key: KAFKA-14000
> URL: https://issues.apache.org/jira/browse/KAFKA-14000
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Xinyu Zou
>Priority: Major
> Attachments: kafka-connect-trace.log
>
>
> I'm using Kafka-connect distributed mode. There're two servers. One active 
> and one standby. The standby server sometimes shows empty tasks list in 
> status rest API response.
> curl host:8443/connectors/name1/status
> {code:java}
> {
>     "connector": {
>         "state": "RUNNING",
>         "worker_id": "1.2.3.4:10443"
>     },
>     "name": "name1",
>     "tasks": [],
>     "type": "source"
> } {code}
> I enabled TRACE log and checked. As required, the connect-status topic is set 
> to cleanup.policy=compact. But messages in the topic won't be compacted 
> timely. They will be compacted in a specific interval. So usually there're 
> more than one messages with same key. E.g. When kafka-connect is launched 
> there's no connector running. And then we start a new connector. Then there 
> will be two messages in connect-status topic:
> status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', 
> generation=100
> status-task-name1 : __
> Please check the log file [^kafka-connect-trace.log]. We can see that the 
> tasks status was removed finally. But actually the empty status was not the 
> newest message in topic connect-status.
>  
> When reading status from connect-status topic, it doesn't sort messages by 
> generation.
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java]
> So I think this could be improved. We can either sort the messages after poll 
> or compare generation value before we choose correct status message.



--
This message was 

[jira] [Updated] (KAFKA-14000) Kafka-connect standby server shows empty tasks list

2022-06-15 Thread Xinyu Zou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xinyu Zou updated KAFKA-14000:
--
Attachment: kafka-connect-trace.log

> Kafka-connect standby server shows empty tasks list
> ---
>
> Key: KAFKA-14000
> URL: https://issues.apache.org/jira/browse/KAFKA-14000
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Xinyu Zou
>Priority: Major
> Attachments: kafka-connect-trace.log
>
>
> I'm using Kafka-connect distributed mode. There're two servers. One active 
> and one standby. The standby server sometimes shows empty tasks list in 
> status rest API response.
> curl host:8443/connectors/name1/status
> {code:java}
> {
>     "connector": {
>         "state": "RUNNING",
>         "worker_id": "1.2.3.4:10443"
>     },
>     "name": "name1",
>     "tasks": [],
>     "type": "source"
> } {code}
> I enabled TRACE log and checked. As required, the connect-status topic is set 
> to cleanup.policy=compact. But messages in the topic won't be compacted 
> timely. They will be compacted in a specific interval. So usually there're 
> more than one messages with same key. E.g. When kafka-connect is launched 
> there's no connector running. And then we start a new connector. Then there 
> will be two messages in connect-status topic:
> status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', 
> generation=100
> status-task-name1 : __
>  
> When reading status from connect-status topic, it doesn't sort messages by 
> generation.
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java]
> So I think this could be improved. We can either sort the messages after poll 
> or compare generation value before we choose correct status message.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-14000) Kafka-connect standby server shows empty tasks list

2022-06-15 Thread Xinyu Zou (Jira)
Xinyu Zou created KAFKA-14000:
-

 Summary: Kafka-connect standby server shows empty tasks list
 Key: KAFKA-14000
 URL: https://issues.apache.org/jira/browse/KAFKA-14000
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.6.0
Reporter: Xinyu Zou


I'm using Kafka-connect distributed mode. There're two servers. One active and 
one standby. The standby server sometimes shows empty tasks list in status rest 
API response.

curl host:8443/connectors/name1/status
{code:java}
{
    "connector": {
        "state": "RUNNING",
        "worker_id": "1.2.3.4:10443"
    },
    "name": "name1",
    "tasks": [],
    "type": "source"
} {code}
I enabled TRACE log and checked. As required, the connect-status topic is set 
to cleanup.policy=compact. But messages in the topic won't be compacted timely. 
They will be compacted in a specific interval. So usually there're more than 
one messages with same key. E.g. When kafka-connect is launched there's no 
connector running. And then we start a new connector. Then there will be two 
messages in connect-status topic:

status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', 
generation=100

status-task-name1 : __

 

When reading status from connect-status topic, it doesn't sort messages by 
generation.

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java]

So I think this could be improved. We can either sort the messages after poll 
or compare generation value before we choose correct status message.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2022-06-15 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554884#comment-17554884
 ] 

hudeqi commented on KAFKA-12478:


Hello, Guozhang. I have started a vote on KIP-842 for this issue. Does the 
status of this issue also need to be changed synchronously? In addition, please 
check and vote on this vote, thank you. cc @ [~showuon] 

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.1
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
> Attachments: safe-console-consumer.png, safe-consume.png, 
> safe-produce.png, trunk-console-consumer.png, trunk-consume.png, 
> trunk-produce.png
>
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*
>  
> I did a test and the result is as attached screenshot. Firstly, set by 
> producer and consumer "metadata.max.age.ms" are 500ms and 3ms 
> respectively.
> _trunk-console-consumer.png_ means to use the community version to start the 
> consumer and set "latest". 
> _trunk-produce.png_ means the data produced, "partition_count" means the 
> number of partitions of the current topic, "message" means the digital 
> content of the corresponding message, "send_to_partition_index" Indicates the 
> index of the partition to which the corresponding message is sent. It can be 
> seen that at 11:32:10, the producer perceives the expansion of the total 
> partitions from 2 to 3, and writes the numbers 38, 41, and 44 into the newly 
> expanded partition 2.
> _trunk-consume.png_ represents all the digital content consumed by the 
> community version. You can see that 38 and 41 sent to partition 2 were not 
> consumed at the beginning. Finally, after partition 2 was perceived, 38 and 
> 41 were still not consumed. Instead, it has been consumed from the latest 44, 
> so the two data of 38 and 41 are discarded.
>  
> _safe-console-consumer.png_ means to use the fixed version to start the 
> consumer and set "safe_latest". 
> _safe-produce.png_ means the data produced. It can be seen that at 12:12:09, 
> the producer perceives the expansion of the total partitions from 4 to 5, and 
> writes the numbers 109 and 114 into the newly expanded partition 4.
> _safe-consume.png_ represents all the digital content consumed by the fixed 
> version. You can see that 109 sent to partition 4 were not consumed at the 
> beginning. Finally, after partition 4 was perceived,109 was consumed as the 
> first data of partition 4. So the fixed version will not cause consumption to 
> lose data under this condition.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] C0urante commented on pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)

2022-06-15 Thread GitBox


C0urante commented on PR #11781:
URL: https://github.com/apache/kafka/pull/11781#issuecomment-1157162231

   Hey guys--thanks for the reviews, really appreciate the rapid responses 
here. I found a bug that's been a bit trickier to solve than expected and have 
had little time to work on it this week. I plan to push the next draft by 
Friday at the very latest.
   
   If it matters, the bug is that the offset stores for regular 
(non-exactly-once) source tasks, and source connectors, are never started. I'm 
planning on fixing that first, then adding an integration test case to 
https://github.com/apache/kafka/pull/11782 to simulate a soft downgrade where 
someone disables exactly-once support on their worker after creating a 
connector and letting it run for a bit, and finally, manually auditing the 
changes for KIP-618 to catch any other potential bugs related to improper 
initialization or cleanup of resources.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bozhao12 commented on pull request #12286: KAFKA-13984: Fix TopicBasedRemoteLogMetadataManager#initializeResources should exit immediately when partition size of __remote_log_metadata

2022-06-15 Thread GitBox


bozhao12 commented on PR #12286:
URL: https://github.com/apache/kafka/pull/12286#issuecomment-1157064520

   @divijvaidya Thanks for your review. I add a unit test based on your 
suggestion. Due to the restart operation involved, I put this unit test in 
`TopicBasedRemoteLogMetadataManagerRestartTest`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13999) Add ProducerCount metrics (KIP-847)

2022-06-15 Thread Artem Livshits (Jira)
Artem Livshits created KAFKA-13999:
--

 Summary: Add ProducerCount metrics (KIP-847)
 Key: KAFKA-13999
 URL: https://issues.apache.org/jira/browse/KAFKA-13999
 Project: Kafka
  Issue Type: Improvement
Reporter: Artem Livshits


See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerCount+metrics



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mdedetrich commented on pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16

2022-06-15 Thread GitBox


mdedetrich commented on PR #12284:
URL: https://github.com/apache/kafka/pull/12284#issuecomment-1156984238

   > Is there something you're looking for in 2.12.16?
   
   So I already had a detailed look into this, it only occurs at compile time 
(see https://github.com/scala/bug/issues/12605#issuecomment-1151427077) which 
means that unless you get that specifically mentioned compile error there is no 
adverse effect. You can see the precise details with the linked ticket at 
https://github.com/scala/bug/issues/12605 but in summary Kafka is completely 
unaffected by this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16

2022-06-15 Thread GitBox


ijuma commented on PR #12284:
URL: https://github.com/apache/kafka/pull/12284#issuecomment-1156981130

   I notice there's a regression:
   
   > Scala 2.12.16 contains a https://github.com/scala/bug/issues/12605 that 
was discovered after the artifacts were published. Only mixed compilation of 
Scala and Java source files together is affected, and only when the Scala code 
contains references to certain nested classes in the Java sources. The problem 
manifests as a compile-time type error. Follow link for details and 
workarounds. We'll fix the problem in Scala 2.12.17 which we expect to release 
in a few months.
   
   Is there something you're looking for in 2.12.16?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12241: MINOR: Fix docs in upgrade.html

2022-06-15 Thread GitBox


ijuma commented on code in PR #12241:
URL: https://github.com/apache/kafka/pull/12241#discussion_r898459208


##
docs/upgrade.html:
##
@@ -1265,7 +1265,7 @@ Notable changes in 1
 on live log directories even if there are offline log directories. A 
log directory may become offline due to IOException
 caused by hardware failure. Users need to monitor the per-broker 
metric offlineLogDirectoryCount to check
 whether there is offline log directory. 
-Added KafkaStorageException which is a retriable exception. 
KafkaStorageException will be converted to NotLeaderForPartitionException in 
the response
+Added KafkaStorageException which is a retriable exception. 
KafkaStorageException will be converted to NotLeaderOrFollowerException in the 
response

Review Comment:
   Thanks, closing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma closed pull request #12241: MINOR: Fix docs in upgrade.html

2022-06-15 Thread GitBox


ijuma closed pull request #12241: MINOR: Fix docs in upgrade.html
URL: https://github.com/apache/kafka/pull/12241


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y opened a new pull request, #12299: MINOR: Guard against decrementing `totalCommittedSinceLastSummary` du…

2022-06-15 Thread GitBox


jnh5y opened a new pull request, #12299:
URL: https://github.com/apache/kafka/pull/12299

   …ring rebalancing.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on pull request #12298: KAFKA-13998 JoinGroupRequestData 'reason' can be too large

2022-06-15 Thread GitBox


jnh5y commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1156967918

   I tried to find a way to create a unit test for this change, but I wasn't 
able to do so quickly.  If someone has a suggestion for how to do that, I'm 
happy to follow through with it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y opened a new pull request, #12298: KAFKA-13998 JoinGroupRequestData 'reason' can be too large

2022-06-15 Thread GitBox


jnh5y opened a new pull request, #12298:
URL: https://github.com/apache/kafka/pull/12298

   This fix follows the pattern which is established in 
`AbstractCoordinator.java` of setting the request reason with the method 
`requestRejoin`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bozhao12 commented on a diff in pull request #12241: MINOR: Fix docs in upgrade.html

2022-06-15 Thread GitBox


bozhao12 commented on code in PR #12241:
URL: https://github.com/apache/kafka/pull/12241#discussion_r898448690


##
docs/upgrade.html:
##
@@ -1265,7 +1265,7 @@ Notable changes in 1
 on live log directories even if there are offline log directories. A 
log directory may become offline due to IOException
 caused by hardware failure. Users need to monitor the per-broker 
metric offlineLogDirectoryCount to check
 whether there is offline log directory. 
-Added KafkaStorageException which is a retriable exception. 
KafkaStorageException will be converted to NotLeaderForPartitionException in 
the response
+Added KafkaStorageException which is a retriable exception. 
KafkaStorageException will be converted to NotLeaderOrFollowerException in the 
response

Review Comment:
   Indeed, this is reasonable, thanks for your review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13998) JoinGroupRequestData 'reason' can be too large

2022-06-15 Thread Jim Hughes (Jira)
Jim Hughes created KAFKA-13998:
--

 Summary: JoinGroupRequestData 'reason' can be too large
 Key: KAFKA-13998
 URL: https://issues.apache.org/jira/browse/KAFKA-13998
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.2.0
Reporter: Jim Hughes
Assignee: Jim Hughes


We saw an exception like this: 

```org.apache.kafka.streams.errors.StreamsException: 
java.lang.RuntimeException: 'reason' field is too long to be serialized 3 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
 4 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
 5Caused by: java.lang.RuntimeException: 'reason' field is too long to be 
serialized 6 at 
org.apache.kafka.common.message.JoinGroupRequestData.addSize(JoinGroupRequestData.java:465)
 7 at 
org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) 8 
at 
org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
 9 at 
org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
 10 at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524) 11 
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500) 12 at 
org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460) 13 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:499)
 14 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
 15 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 16 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
 17 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:437)
 18 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:371)
 19 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:542)
 20 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
 21 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) 
22 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1215) 
23 at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:969)
 24 at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:917)
 25 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:736)
 26 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
 27 ... 1 more```

This appears to be caused by the code passing an entire stack trace in the 
`rejoinReason`.  See 
https://github.com/apache/kafka/blob/3.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L481



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] guozhangwang commented on pull request #12121: KAFKA-13846: Adding overloaded addMetricIfAbsent method

2022-06-15 Thread GitBox


guozhangwang commented on PR #12121:
URL: https://github.com/apache/kafka/pull/12121#issuecomment-1156954295

   > @guozhangwang , which file does this correspond to? that update the web 
docs on 3.3 release new API changes
   
   Here: https://github.com/apache/kafka/blob/trunk/docs/upgrade.html. You can 
find earlier PRs how they update the upgrade guide / API changes in upcoming 
releases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations

2022-06-15 Thread GitBox


mjsax commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r898411852


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java:
##
@@ -61,6 +65,18 @@ public  QueryResult query(final Query query,
 );
 }
 
+@Override
+public KeyValueIterator, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+  final 
Instant latestSessionEndTime) {
+final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+final KeyValueIterator bytesIterator = 
wrapped().fetchAll(earliestEndTime, latestEndTime);

Review Comment:
   If I read the code correctly, what `fetchAll()` does is correct: from my 
understanding, that fetchAll() is implement to find "overlapping sessions" 
given a lower and upper bound -- the lower bound must be smaller and session 
end and the upper bound must be smaller than session start to find an overlap. 
Because the upper bound compares to session start,, and we use the "base" we 
need to search the full "data/base part" of the store.
   
   I guess the issue is, that you actually cannot use `fetchAll()` at all for 
our purpose here? Passing in `lastEndTime` does not work (does it) as it would 
be used to compare to session start-times, but we want to do a comparison to 
session end time. -- Thus, I think the right solution is, to actually also add 
the new `findSessions()` to the internal `SegmentedStore` and implement a 
proper iterator there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations

2022-06-15 Thread GitBox


mjsax commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r898406030


##
streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java:
##
@@ -39,6 +39,13 @@
  */
 public interface SessionStore extends StateStore, 
ReadOnlySessionStore {
 
+// TODO: javadoc; both ends are inclusive
+default KeyValueIterator, AGG> findSessions(final Instant 
earliestSessionEndTime,

Review Comment:
   I think there is no way around it? In the end, we allow users to plugin a 
custom session-store -- thus, if the use the new emit-final, why will need to 
implement this new method -- existing code with custom session-stores should 
not break, because existing code does neither implement but also not sure this 
new method.
   
   If we don't make it public API, we would prevent users to pass in custom 
session-stores in combination with the new emit-final feature, what seems to be 
too restrictive?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations

2022-06-15 Thread GitBox


guozhangwang commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r880784817


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java:
##
@@ -61,6 +65,18 @@ public  QueryResult query(final Query query,
 );
 }
 
+@Override
+public KeyValueIterator, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+  final 
Instant latestSessionEndTime) {
+final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+final KeyValueIterator bytesIterator = 
wrapped().fetchAll(earliestEndTime, latestEndTime);

Review Comment:
   This is the second open question: with the current prefixed (base, i.e. 
time-first) session key schema, this fetchAll would be effectively searching 
for `[earliestEnd, INF]` because of this logic: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java#L46
   
   This is because we translate the range query without key inside 
`AbstractRocksDBTimeOrderedSegmentedBytesStore` by using the `lower/upperRange` 
instead of `lower/upperRangeFixedSize`): 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java#L241-L242
   
   I cannot remember why we need to do this. @lihaosky @mjsax do you remember 
why?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations

2022-06-15 Thread GitBox


mjsax commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r898340071


##
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##
@@ -202,25 +205,43 @@ public void remove(final Windowed sessionKey) {
 
 @Override
 public byte[] fetchSession(final Bytes key,
-   final long earliestSessionEndTime,
-   final long latestSessionStartTime) {
+   final long sessionStartTime,
+   final long sessionEndTime) {
 removeExpiredSegments();
 
 Objects.requireNonNull(key, "key cannot be null");
 
 // Only need to search if the record hasn't expired yet
-if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
-final ConcurrentNavigableMap> keyMap = endTimeMap.get(latestSessionStartTime);
+if (sessionEndTime > observedStreamTime - retentionPeriod) {
+final ConcurrentNavigableMap> keyMap = endTimeMap.get(sessionEndTime);
 if (keyMap != null) {
 final ConcurrentNavigableMap startTimeMap = 
keyMap.get(key);
 if (startTimeMap != null) {
-return startTimeMap.get(earliestSessionEndTime);
+return startTimeMap.get(sessionStartTime);
 }
 }
 }
 return null;
 }
 
+@Override
+public KeyValueIterator, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+  final 
Instant latestSessionEndTime) {
+removeExpiredSegments();
+
+final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+// since subMap is exclusive on toKey, we need to plus one
+return registerNewIterator(null,
+   null,
+Long.MAX_VALUE,
+endTimeMap.subMap(earliestEndTime, 
latestEndTime + 1).entrySet().iterator(),
+true);

Review Comment:
   Ok. I read the code of `InMemorySessionStore` in detail and now understand 
what's going on. This LGTM.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##
@@ -202,25 +205,43 @@ public void remove(final Windowed sessionKey) {
 
 @Override
 public byte[] fetchSession(final Bytes key,
-   final long earliestSessionEndTime,
-   final long latestSessionStartTime) {
+   final long sessionStartTime,
+   final long sessionEndTime) {
 removeExpiredSegments();
 
 Objects.requireNonNull(key, "key cannot be null");
 
 // Only need to search if the record hasn't expired yet
-if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
-final ConcurrentNavigableMap> keyMap = endTimeMap.get(latestSessionStartTime);
+if (sessionEndTime > observedStreamTime - retentionPeriod) {
+final ConcurrentNavigableMap> keyMap = endTimeMap.get(sessionEndTime);
 if (keyMap != null) {
 final ConcurrentNavigableMap startTimeMap = 
keyMap.get(key);
 if (startTimeMap != null) {
-return startTimeMap.get(earliestSessionEndTime);
+return startTimeMap.get(sessionStartTime);
 }
 }
 }
 return null;
 }
 
+@Override
+public KeyValueIterator, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+  final 
Instant latestSessionEndTime) {
+removeExpiredSegments();
+
+final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+// since subMap is exclusive on toKey, we need to plus one
+return registerNewIterator(null,
+   null,
+Long.MAX_VALUE,

Review Comment:
   nit: indention



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [kafka] mjsax commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations

2022-06-15 Thread GitBox


mjsax commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r898339592


##
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##
@@ -202,25 +205,43 @@ public void remove(final Windowed sessionKey) {
 
 @Override
 public byte[] fetchSession(final Bytes key,
-   final long earliestSessionEndTime,
-   final long latestSessionStartTime) {
+   final long sessionStartTime,
+   final long sessionEndTime) {
 removeExpiredSegments();
 
 Objects.requireNonNull(key, "key cannot be null");
 
 // Only need to search if the record hasn't expired yet
-if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
-final ConcurrentNavigableMap> keyMap = endTimeMap.get(latestSessionStartTime);
+if (sessionEndTime > observedStreamTime - retentionPeriod) {
+final ConcurrentNavigableMap> keyMap = endTimeMap.get(sessionEndTime);
 if (keyMap != null) {
 final ConcurrentNavigableMap startTimeMap = 
keyMap.get(key);
 if (startTimeMap != null) {
-return startTimeMap.get(earliestSessionEndTime);
+return startTimeMap.get(sessionStartTime);
 }
 }
 }
 return null;
 }
 
+@Override
+public KeyValueIterator, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+  final 
Instant latestSessionEndTime) {
+removeExpiredSegments();
+
+final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+// since subMap is exclusive on toKey, we need to plus one
+return registerNewIterator(null,
+   null,
+Long.MAX_VALUE,
+endTimeMap.subMap(earliestEndTime, 
latestEndTime + 1).entrySet().iterator(),

Review Comment:
   Nit: can we call `subMap(earliestEndTime, true, latestEndTime, true)` which 
is the same thing but more "intuitive" as we always search for _inclusive_ 
bounds throughout the code (otherwise, this is the only place which has an 
exclusive upper bound).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

2022-06-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554742#comment-17554742
 ] 

Matthias J. Sax commented on KAFKA-13939:
-

Thanks for the PR. I added you to list of contributors and assigned the ticket 
to you. You can know also self-assign tickets.

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -
>
> Key: KAFKA-13939
> URL: https://issues.apache.org/jira/browse/KAFKA-13939
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jackson Newhouse
>Assignee: Jackson Newhouse
>Priority: Blocker
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within 
> `flush()`, see 
> [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.]
>  However, dirtyKeys is still written to in the loop within `evictWhile`. This 
> causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

2022-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-13939:
---

Assignee: Jackson Newhouse

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -
>
> Key: KAFKA-13939
> URL: https://issues.apache.org/jira/browse/KAFKA-13939
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jackson Newhouse
>Assignee: Jackson Newhouse
>Priority: Blocker
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within 
> `flush()`, see 
> [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.]
>  However, dirtyKeys is still written to in the loop within `evictWhile`. This 
> causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-15 Thread Niket Goel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554738#comment-17554738
 ] 

Niket Goel commented on KAFKA-13888:


The PR that adds the API to handler to the admin client 
([https://github.com/apache/kafka/pull/12206)] has now been merged.

The API does not return any value for the newly added fields at this time. 
[~Jack-Lee] has a draft PR to add the implementation for the fields. Following 
up with him to iterate on his PR.

> KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
> --
>
> Key: KAFKA-13888
> URL: https://issues.apache.org/jira/browse/KAFKA-13888
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Niket Goel
>Assignee: lqjacklee
>Priority: Major
> Fix For: 3.3.0
>
>
> Tracking issue for the implementation of KIP:836



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-15 Thread Niket Goel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554738#comment-17554738
 ] 

Niket Goel edited comment on KAFKA-13888 at 6/15/22 5:59 PM:
-

The PR that adds the API to handler to the admin client 
([https://github.com/apache/kafka/pull/12206)] has now been merged.

The API does not return any value for the newly added fields at this time. 
[~Jack-Lee] has a draft PR (https://github.com/apache/kafka/pull/12212) to add 
the implementation for the fields. Following up with him to iterate on his PR.


was (Author: niket goel):
The PR that adds the API to handler to the admin client 
([https://github.com/apache/kafka/pull/12206)] has now been merged.

The API does not return any value for the newly added fields at this time. 
[~Jack-Lee] has a draft PR to add the implementation for the fields. Following 
up with him to iterate on his PR.

> KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
> --
>
> Key: KAFKA-13888
> URL: https://issues.apache.org/jira/browse/KAFKA-13888
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Niket Goel
>Assignee: lqjacklee
>Priority: Major
> Fix For: 3.3.0
>
>
> Tracking issue for the implementation of KIP:836



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mjsax commented on a diff in pull request #12293: KAFKA-13963: Clarified java doc for processors api

2022-06-15 Thread GitBox


mjsax commented on code in PR #12293:
URL: https://github.com/apache/kafka/pull/12293#discussion_r898266334


##
streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java:
##
@@ -30,6 +30,7 @@
  * In contrast, two sub-topologies are not connected but can be linked to each 
other via topics, i.e., if one
  * sub-topology {@link Topology#addSink(String, String, String...) writes} 
into a topic and another sub-topology
  * {@link Topology#addSource(String, String...) reads} from the same topic.
+ * Processors and Transformers created with the Processor API are treated as 
black boxes and are not represented in the topology graph.

Review Comment:
   Can you update the PR to re-phrase it so it's easier to understand?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on pull request #12212: Kafka 13888 new fields

2022-06-15 Thread GitBox


niket-goel commented on PR #12212:
URL: https://github.com/apache/kafka/pull/12212#issuecomment-1156768917

   Hey @lqjack . Thanks for raising this PR. The PR 
https://github.com/apache/kafka/pull/12206 has now been merged to trunk. Can 
you please update your PR with the latest code so we can iterate on it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12121: KAFKA-13846: Adding overloaded addMetricIfAbsent method

2022-06-15 Thread GitBox


vamossagar12 commented on PR #12121:
URL: https://github.com/apache/kafka/pull/12121#issuecomment-1156764298

   @ijuma ., i updated the PR name. Also, created a follow up PR to address 
some of the comments: https://github.com/apache/kafka/pull/12297
   @guozhangwang , which file does this correspond to? ` that update the web 
docs on 3.3 release new API changes`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-06-15 Thread GitBox


philipnee commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1156747595

   @ijuma - I think @hachikuji is reviewing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12285: KAFKA-7342 Part 1: Straightforward JUnit4 to JUnit5 migrations

2022-06-15 Thread GitBox


clolov commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1156744613

   Hello! Thank you to everyone who has left a comment and suggestions for 
improvement. In the next few days I will aim to rework this pull request. In 
summary:
   * I will revert the import reordering
   * I will not prefix assertions with Assertions
   * I will mention that these changes are for the streams module
   * I will split the PR into multiple ones so to stick to the <= 500 lines rule


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12233: MINOR: Clean up tmp files created by tests

2022-06-15 Thread GitBox


divijvaidya commented on code in PR #12233:
URL: https://github.com/apache/kafka/pull/12233#discussion_r898241099


##
metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java:
##
@@ -31,9 +35,21 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class BootstrapMetadataTest {
+private Path tmpDir;

Review Comment:
   I think we should continue to use Path because:
   1. `BootstrapMetadata.write()` used in every test in this file accepts a 
`Path`. 
   2. Prior to this change, we were using Path in this test file, so the 
current PR doesn't change anything on that front.
   3. The new Java NIO.2 introduced many new(aka better) helper methods using 
`Files.*` which use `Path` instead of files. I am already starting to replace 
the older `java.io` methods with new Nio.2 methods in the code base [1]. Due to 
the benefits of the new helper methods, I think we should continue to stick use 
`Path` instead of `File`.
   
   However, please let me know if you feel strongly about this comment, I will 
make the change accordingly.
   
   [1] https://issues.apache.org/jira/browse/KAFKA-13928 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16

2022-06-15 Thread GitBox


mdedetrich commented on code in PR #12284:
URL: https://github.com/apache/kafka/pull/12284#discussion_r898217940


##
docs/upgrade.html:
##
@@ -59,6 +59,9 @@ Upgrading to 
3.2.0 from any vers
 (or to take advantage of exactly once semantics),
 the newer Java clients must be used.
 
+Upgrade from Scala 2.12.15 to 2.12.16. See 
https://github.com/scala/scala/releases/tag/v2.12.16 for release

Review Comment:
   I removed the release notes and forced pushed the branch. The PR description 
contains the release notes instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2022-06-15 Thread GitBox


ijuma commented on PR #12149:
URL: https://github.com/apache/kafka/pull/12149#issuecomment-1156718718

   @philipnee where are we with this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12148: MINOR: Remove unnecessary log4j-appender dependency and tweak explicit log4j dependency

2022-06-15 Thread GitBox


ijuma commented on PR #12148:
URL: https://github.com/apache/kafka/pull/12148#issuecomment-1156718290

   @omkreddy maybe you can help review this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16

2022-06-15 Thread GitBox


mdedetrich commented on code in PR #12284:
URL: https://github.com/apache/kafka/pull/12284#discussion_r898217940


##
docs/upgrade.html:
##
@@ -59,6 +59,9 @@ Upgrading to 
3.2.0 from any vers
 (or to take advantage of exactly once semantics),
 the newer Java clients must be used.
 
+Upgrade from Scala 2.12.15 to 2.12.16. See 
https://github.com/scala/scala/releases/tag/v2.12.16 for release

Review Comment:
   I removed the these release and forced pushed the branch. The PR description 
contains the release notes instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma closed pull request #12232: MINOR:rm deprecated method

2022-06-15 Thread GitBox


ijuma closed pull request #12232: MINOR:rm deprecated method
URL: https://github.com/apache/kafka/pull/12232


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12232: MINOR:rm deprecated method

2022-06-15 Thread GitBox


ijuma commented on PR #12232:
URL: https://github.com/apache/kafka/pull/12232#issuecomment-1156711344

   Yes, we only remove deprecated methods during major releases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12233: MINOR: Clean up tmp files created by tests

2022-06-15 Thread GitBox


ijuma commented on PR #12233:
URL: https://github.com/apache/kafka/pull/12233#issuecomment-1156710957

   Thanks for the PR. It looks reasonable, just one nit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12233: MINOR: Clean up tmp files created by tests

2022-06-15 Thread GitBox


ijuma commented on code in PR #12233:
URL: https://github.com/apache/kafka/pull/12233#discussion_r898209292


##
metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java:
##
@@ -31,9 +35,21 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class BootstrapMetadataTest {
+private Path tmpDir;

Review Comment:
   Not sure we gain much by using `Path` since all our utility methods work 
with `File`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12241: MINOR: Fix docs in upgrade.html

2022-06-15 Thread GitBox


ijuma commented on code in PR #12241:
URL: https://github.com/apache/kafka/pull/12241#discussion_r898207431


##
docs/upgrade.html:
##
@@ -1265,7 +1265,7 @@ Notable changes in 1
 on live log directories even if there are offline log directories. A 
log directory may become offline due to IOException
 caused by hardware failure. Users need to monitor the per-broker 
metric offlineLogDirectoryCount to check
 whether there is offline log directory. 
-Added KafkaStorageException which is a retriable exception. 
KafkaStorageException will be converted to NotLeaderForPartitionException in 
the response
+Added KafkaStorageException which is a retriable exception. 
KafkaStorageException will be converted to NotLeaderOrFollowerException in the 
response

Review Comment:
   This is for 1.0.0, at the time it was called 
`NotLeaderForPartitionException`. I don't think we want to update old release 
notes in this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12278: MINOR: add AuthorizerNotReadyException

2022-06-15 Thread GitBox


ijuma commented on PR #12278:
URL: https://github.com/apache/kafka/pull/12278#issuecomment-1156706455

   Do we have a KIP for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12290: MINOR: Stop leaking threads in BlockingConnectorTest

2022-06-15 Thread GitBox


ijuma commented on PR #12290:
URL: https://github.com/apache/kafka/pull/12290#issuecomment-1156705915

   @kkonstantine can you please review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16

2022-06-15 Thread GitBox


ijuma commented on code in PR #12284:
URL: https://github.com/apache/kafka/pull/12284#discussion_r898205328


##
docs/upgrade.html:
##
@@ -59,6 +59,9 @@ Upgrading to 
3.2.0 from any vers
 (or to take advantage of exactly once semantics),
 the newer Java clients must be used.
 
+Upgrade from Scala 2.12.15 to 2.12.16. See 
https://github.com/scala/scala/releases/tag/v2.12.16 for release

Review Comment:
   We don't usually add patch upgrades to the release notes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12285: KAFKA-7342 Part 1: Straightforward JUnit4 to JUnit5 migrations

2022-06-15 Thread GitBox


ijuma commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1156705224

   > Worth putting in a separate PR, but have you tried enabling the Jupiter 
parallel test runner? When I ran it on a work project, it improved build times 
by an order of magnitude
   
   We use multiple forks at the gradle level, so we should not enable this.
   
   A couple more comments:
   1. Let's revert the formatting changes.
   2. Do not prefix with `Assertions`.
   3. Mention in the PR title that these changes are for the streams module(s).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reopened KAFKA-13888:
-

> KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
> --
>
> Key: KAFKA-13888
> URL: https://issues.apache.org/jira/browse/KAFKA-13888
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Niket Goel
>Assignee: lqjacklee
>Priority: Major
> Fix For: 3.3.0
>
>
> Tracking issue for the implementation of KIP:836



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji commented on pull request #12212: Kafka 13888 new fields

2022-06-15 Thread GitBox


hachikuji commented on PR #12212:
URL: https://github.com/apache/kafka/pull/12212#issuecomment-1156686396

   @lqjack Can you merge with trunk please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13888.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

> KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
> --
>
> Key: KAFKA-13888
> URL: https://issues.apache.org/jira/browse/KAFKA-13888
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Niket Goel
>Assignee: lqjacklee
>Priority: Major
> Fix For: 3.3.0
>
>
> Tracking issue for the implementation of KIP:836



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji merged pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-15 Thread GitBox


hachikuji merged PR #12206:
URL: https://github.com/apache/kafka/pull/12206


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac closed pull request #12292: MINOR: KRaft nodes not shutdown correctly when using one controller in colocated mode

2022-06-15 Thread GitBox


dajac closed pull request #12292: MINOR: KRaft nodes not shutdown correctly 
when using one controller in colocated mode
URL: https://github.com/apache/kafka/pull/12292


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12292: MINOR: KRaft nodes not shutdown correctly when using one controller in colocated mode

2022-06-15 Thread GitBox


dajac commented on PR #12292:
URL: https://github.com/apache/kafka/pull/12292#issuecomment-1156567953

   Already fixed by https://github.com/apache/kafka/pull/11238.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #11238: MINOR: Fix force kill of KRaft colocated controllers in system tests

2022-06-15 Thread GitBox


dajac merged PR #11238:
URL: https://github.com/apache/kafka/pull/11238


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tyamashi-oss opened a new pull request, #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

2022-06-15 Thread GitBox


tyamashi-oss opened a new pull request, #12296:
URL: https://github.com/apache/kafka/pull/12296

   - Implementation:
 - Add updateDesiredRatePerSec() on Throttler
 - Call updateDesiredRatePerSec() of Throttler with new 
log.cleaner.io.max.bytes.per.second value in reconfigure() of Log Cleaner
 - I implemented the feature to be similar to [reconfigure() of 
SocketServer](https://github.com/apache/kafka/blob/fa59be4e770627cd34cef85986b58ad7f606928d/core/src/main/scala/kafka/network/SocketServer.scala#L336-L357)
   
   - Alternative implementation considered:
 - re-instantiate Throttler with new log.cleaner.io.max.bytes.per.second 
value in reconfigure() of Log Cleaner 
   - However, since many parameter specifications are required to 
instantiate Throttler, I chose to be similar to SocketServer and update only 
log.cleaner.io.max.bytes.per.second
   
   - Test:
 - Added unit test in case of updating DesiredRatePerSec of Throttler
 - I confirmed by manual testing that log.cleaner.io.max.bytes.per.second 
can be changed using bin/kafka-configs.sh:
 >   [2022-06-15 22:44:03,089] INFO [kafka-log-cleaner-thread-0]:
   Log cleaner thread 0 cleaned log my-topic-0 (dirty section = [57585, 
86901])
   2,799.3 MB of log processed in 596.0 seconds (4.7 MB/sec).
   Indexed 2,799.2 MB in 298.1 seconds (9.4 Mb/sec, 50.0% of total time)
   Buffer utilization: 0.0%
   Cleaned 2,799.3 MB in 298.0 seconds (9.4 Mb/sec, 50.0% of total time)
   Start size: 2,799.3 MB (29,317 messages)
   End size: 0.1 MB (1 messages)
   100.0% size reduction (100.0% fewer messages)
(kafka.log.LogCleaner)`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13996) log.cleaner.io.max.bytes.per.second cannot be changed dynamically

2022-06-15 Thread Tomonari Yamashita (Jira)
Tomonari Yamashita created KAFKA-13996:
--

 Summary: log.cleaner.io.max.bytes.per.second cannot be changed 
dynamically 
 Key: KAFKA-13996
 URL: https://issues.apache.org/jira/browse/KAFKA-13996
 Project: Kafka
  Issue Type: Bug
  Components: config, core, log cleaner
Affects Versions: 3.2.0
Reporter: Tomonari Yamashita
Assignee: Tomonari Yamashita


- log.cleaner.io.max.bytes.per.second cannot be changed dynamically using 
bin/kafka-configs.sh
- Reproduction procedure:
-# Create a topic with cleanup.policy=compact
{code:java}
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create 
--replication-factor 1 --partitions 1 --topic my-topic --config 
cleanup.policy=compact --config cleanup.policy=compact --config 
segment.bytes=104857600 --config compression.type=producer
{code}
-# Change log.cleaner.io.max.bytes.per.second=10485760 using 
bin/kafka-configs.sh
{code:java}
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
--entity-default --alter --add-config 
log.cleaner.io.max.bytes.per.second=10485760
{code}
-# Send enough messages(> segment.bytes=104857600) to activate Log Cleaner
-# logs/log-cleaner.log, configuration by 
log.cleaner.io.max.bytes.per.second=10485760 is not reflected and Log Cleaner 
does not slow down (>= log.cleaner.io.max.bytes.per.second=10485760).
{code:java}
[2022-06-15 14:52:14,988] INFO [kafka-log-cleaner-thread-0]:
Log cleaner thread 0 cleaned log my-topic-0 (dirty section = [39786, 
81666])
3,999.0 MB of log processed in 2.7 seconds (1,494.4 MB/sec).
Indexed 3,998.9 MB in 0.9 seconds (4,218.2 Mb/sec, 35.4% of total time)
Buffer utilization: 0.0%
Cleaned 3,999.0 MB in 1.7 seconds (2,314.2 Mb/sec, 64.6% of total time)
Start size: 3,999.0 MB (41,881 messages)
End size: 0.1 MB (1 messages)
100.0% size reduction (100.0% fewer messages)
 (kafka.log.LogCleaner)
{code}
- Problem cause:
-- log.cleaner.io.max.bytes.per.second is used in Throttler in LogCleaner, 
however, it is only passed to Throttler at initialization time.
--- 
https://github.com/apache/kafka/blob/4380eae7ceb840dd93fee8ec90cd89a72bad7a3f/core/src/main/scala/kafka/log/LogCleaner.scala#L107-L112
-- Need to change Throttler configuration value at reconfigure() of LogCleaner.
 --- 
https://github.com/apache/kafka/blob/4380eae7ceb840dd93fee8ec90cd89a72bad7a3f/core/src/main/scala/kafka/log/LogCleaner.scala#L192-L196
- A workaround is that restarting every broker adding 
log.cleaner.io.max.bytes.per.second to config/server.properties



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13997) one partition logs are not getting purged

2022-06-15 Thread Naveen P (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Naveen P updated KAFKA-13997:
-
Description: 
We have issue with one of our topic in kafka cluster, which is taking huge 
space, while checking found one of the partion doesn't have old logs getting 
purged, due to this space is getting full. 

 

Since we have replication factor 3 for this topic, same behaviour is observed 
on 3 kafka broker nodes. We need workaround to cleanup old log messages and 
also find root cause for this issue.

  was:
We have issue with one of our topic in kafka cluster, which is taking huge 
space, while checking found one of the partion doesn't have old logs getting 
purged, due to this space is getting full. 

 

Since we have replication factor 3 for this topic, same behaviour is observed 
on 2 kafka broker nodes. We need workaround to cleanup old log messages and 
also find root cause for this issue.


> one partition logs are not getting purged 
> --
>
> Key: KAFKA-13997
> URL: https://issues.apache.org/jira/browse/KAFKA-13997
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Naveen P
>Priority: Major
>
> We have issue with one of our topic in kafka cluster, which is taking huge 
> space, while checking found one of the partion doesn't have old logs getting 
> purged, due to this space is getting full. 
>  
> Since we have replication factor 3 for this topic, same behaviour is observed 
> on 3 kafka broker nodes. We need workaround to cleanup old log messages and 
> also find root cause for this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13997) one partition logs are not getting purged

2022-06-15 Thread Naveen P (Jira)
Naveen P created KAFKA-13997:


 Summary: one partition logs are not getting purged 
 Key: KAFKA-13997
 URL: https://issues.apache.org/jira/browse/KAFKA-13997
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 2.0.0
Reporter: Naveen P


We have issue with one of our topic in kafka cluster, which is taking huge 
space, while checking found one of the partion doesn't have old logs getting 
purged, due to this space is getting full. 

 

Since we have replication factor 3 for this topic, same behaviour is observed 
on 2 kafka broker nodes. We need workaround to cleanup old log messages and 
also find root cause for this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13995) Does Kafka support Network File System (NFS)? Is it recommended in Production?

2022-06-15 Thread Devarshi Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554577#comment-17554577
 ] 

Devarshi Shah commented on KAFKA-13995:
---

A kind request to answer as soon as possible as it's blocking our deliveries in 
customer's production environments. 

> Does Kafka support Network File System (NFS)? Is it recommended in Production?
> --
>
> Key: KAFKA-13995
> URL: https://issues.apache.org/jira/browse/KAFKA-13995
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.0.0
> Environment: Kubernetes Cluster
>Reporter: Devarshi Shah
>Priority: Blocker
>
> I've gone through the Apache Kafka Documentation. It does not contain 
> information about the support of underlying storage type, whether Kafka 
> supports block storage, Network File System (NFS) and/or others. On the 
> internet, I could find that it supports NFS, however most of them summarize 
> not to use NFS in Production. May we get proper information whether Kafka 
> recommends NFS in Production, or it doesn't support NFS to begin with?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13995) Does Kafka support Network File System (NFS)? Is it recommended in Production?

2022-06-15 Thread Devarshi Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Devarshi Shah updated KAFKA-13995:
--
Description: I've gone through the Apache Kafka Documentation. It does not 
contain information about the support of underlying storage type, whether Kafka 
supports block storage, Network File System (NFS) and/or others. On the 
internet, I could find that it supports NFS, however most of them summarize not 
to use NFS in Production. May we get proper information whether Kafka 
recommends NFS in Production, or it doesn't support NFS to begin with?  (was: 
I've gone through the Apache Kafka Documentation. It does not contain 
information about the support of underlying storage type, whether Kafka 
supports block storage, Network File System (NFS). On the internet, I could 
find that it supports NFS, however most of them summarize not to use NFS in 
Production. May we get proper information whether Kafka recommends NFS, or it 
doesn't support NFS to begin with?)

> Does Kafka support Network File System (NFS)? Is it recommended in Production?
> --
>
> Key: KAFKA-13995
> URL: https://issues.apache.org/jira/browse/KAFKA-13995
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.0.0
> Environment: Kubernetes Cluster
>Reporter: Devarshi Shah
>Priority: Blocker
>
> I've gone through the Apache Kafka Documentation. It does not contain 
> information about the support of underlying storage type, whether Kafka 
> supports block storage, Network File System (NFS) and/or others. On the 
> internet, I could find that it supports NFS, however most of them summarize 
> not to use NFS in Production. May we get proper information whether Kafka 
> recommends NFS in Production, or it doesn't support NFS to begin with?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13995) Does Kafka support Network File System (NFS)? Is it recommended in Production?

2022-06-15 Thread Devarshi Shah (Jira)
Devarshi Shah created KAFKA-13995:
-

 Summary: Does Kafka support Network File System (NFS)? Is it 
recommended in Production?
 Key: KAFKA-13995
 URL: https://issues.apache.org/jira/browse/KAFKA-13995
 Project: Kafka
  Issue Type: Test
Affects Versions: 3.0.0
 Environment: Kubernetes Cluster
Reporter: Devarshi Shah


I've gone through the Apache Kafka Documentation. It does not contain 
information about the support of underlying storage type, whether Kafka 
supports block storage, Network File System (NFS). On the internet, I could 
find that it supports NFS, however most of them summarize not to use NFS in 
Production. May we get proper information whether Kafka recommends NFS, or it 
doesn't support NFS to begin with?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] divijvaidya commented on pull request #12224: KAFKA-13943: Make LocalLogManager implementation consistent with the RaftClient interface contract

2022-06-15 Thread GitBox


divijvaidya commented on PR #12224:
URL: https://github.com/apache/kafka/pull/12224#issuecomment-1156428420

   @jsancio please review when you get a chance. Currently multiple tests in 
`QuorumControllerTest` are acting flaky because we are allowing to create a 
snapshot with LONG_MAX value. This is making it difficult to review PRs due to 
flakiness.
   
   This code change fixes that. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13943) Fix flaky test QuorumControllerTest.testMissingInMemorySnapshot()

2022-06-15 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya reassigned KAFKA-13943:


Assignee: Divij Vaidya

> Fix flaky test QuorumControllerTest.testMissingInMemorySnapshot()
> -
>
> Key: KAFKA-13943
> URL: https://issues.apache.org/jira/browse/KAFKA-13943
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: flaky-test
>
> Test failed at 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12197/3/tests]
>  
> {noformat}
> [2022-05-27 09:34:42,382] INFO [Controller 0] Creating new QuorumController 
> with clusterId wj9LhgPJTV-KYEItgqvtQA, authorizer Optional.empty. 
> (org.apache.kafka.controller.QuorumController:1484)
> [2022-05-27 09:34:42,393] DEBUG [LocalLogManager 0] Node 0: running log 
> check. (org.apache.kafka.metalog.LocalLogManager:479)
> [2022-05-27 09:34:42,394] DEBUG [LocalLogManager 0] initialized local log 
> manager for node 0 (org.apache.kafka.metalog.LocalLogManager:622)
> [2022-05-27 09:34:42,396] INFO [LocalLogManager 0] Node 0: registered 
> MetaLogListener 1774961169 (org.apache.kafka.metalog.LocalLogManager:640)
> [2022-05-27 09:34:42,397] DEBUG [LocalLogManager 0] Node 0: running log 
> check. (org.apache.kafka.metalog.LocalLogManager:479)
> [2022-05-27 09:34:42,397] DEBUG [LocalLogManager 0] Node 0: Executing 
> handleLeaderChange LeaderAndEpoch(leaderId=OptionalInt[0], epoch=1) 
> (org.apache.kafka.metalog.LocalLogManager:520)
> [2022-05-27 09:34:42,398] DEBUG [Controller 0] Executing 
> handleLeaderChange[1]. (org.apache.kafka.controller.QuorumController:438)
> [2022-05-27 09:34:42,398] INFO [Controller 0] Becoming the active controller 
> at epoch 1, committed offset -1, committed epoch -1, and metadata.version 5 
> (org.apache.kafka.controller.QuorumController:950)
> [2022-05-27 09:34:42,398] DEBUG [Controller 0] Creating snapshot -1 
> (org.apache.kafka.timeline.SnapshotRegistry:197)
> [2022-05-27 09:34:42,399] DEBUG [Controller 0] Processed 
> handleLeaderChange[1] in 951 us 
> (org.apache.kafka.controller.QuorumController:385)
> [2022-05-27 09:34:42,399] INFO [Controller 0] Initializing metadata.version 
> to 5 (org.apache.kafka.controller.QuorumController:926)
> [2022-05-27 09:34:42,399] INFO [Controller 0] Setting metadata.version to 5 
> (org.apache.kafka.controller.FeatureControlManager:273)
> [2022-05-27 09:34:42,400] DEBUG [Controller 0] Creating snapshot 
> 9223372036854775807 (org.apache.kafka.timeline.SnapshotRegistry:197)
> [2022-05-27 09:34:42,400] DEBUG [Controller 0] Read-write operation 
> bootstrapMetadata(1863535402) will be completed when the log reaches offset 
> 9223372036854775807. (org.apache.kafka.controller.QuorumController:725)
> [2022-05-27 09:34:42,402] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, 
> appendTimestamp=10, 
> records=[ApiMessageAndVersion(RegisterBrokerRecord(brokerId=0, 
> incarnationId=kxAT73dKQsitIedpiPtwBw, brokerEpoch=-9223372036854775808, 
> endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, 
> securityProtocol=0)], features=[], rack=null, fenced=true) at version 0)]), 
> prevOffset=1) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:247)
> [2022-05-27 09:34:42,402] INFO [Controller 0] Registered new broker: 
> RegisterBrokerRecord(brokerId=0, incarnationId=kxAT73dKQsitIedpiPtwBw, 
> brokerEpoch=-9223372036854775808, endPoints=[BrokerEndpoint(name='PLAINTEXT', 
> host='localhost', port=9092, securityProtocol=0)], features=[], rack=null, 
> fenced=true) (org.apache.kafka.controller.ClusterControlManager:368)
> [2022-05-27 09:34:42,403] WARN [Controller 0] registerBroker: failed with 
> unknown server exception RuntimeException at epoch 1 in 2449 us.  Reverting 
> to last committed offset -1. 
> (org.apache.kafka.controller.QuorumController:410)java.lang.RuntimeException: 
> Can't create a new snapshot at epoch 1 because there is already a snapshot 
> with epoch 9223372036854775807at 
> org.apache.kafka.timeline.SnapshotRegistry.getOrCreateSnapshot(SnapshotRegistry.java:190)
> at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:723)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
> at java.base/java.lang.Thread.run(Thread.java:833){noformat}
> {noformat}
> Full stack trace
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: 
> 

[GitHub] [kafka] divijvaidya commented on a diff in pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16

2022-06-15 Thread GitBox


divijvaidya commented on code in PR #12284:
URL: https://github.com/apache/kafka/pull/12284#discussion_r897932645


##
docs/upgrade.html:
##
@@ -59,6 +59,9 @@ Upgrading to 
3.2.0 from any vers
 (or to take advantage of exactly once semantics),
 the newer Java clients must be used.
 
+Upgrade from Scala 2.12.15 to 2.12.16. See 
https://github.com/scala/scala/releases/tag/v2.12.16 for release

Review Comment:
   Is this comment at the right place? Currently, it seems like that it is 
under the section "For a rolling upgrade" which contains instructions for Kafka 
cluster administrators  to handle version upgrades which should have nothing to 
do with scala version since scala is bundled as a dependency independent of the 
scala version on the OS.
   
   @ijuma will know more about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16

2022-06-15 Thread GitBox


divijvaidya commented on PR #12284:
URL: https://github.com/apache/kafka/pull/12284#issuecomment-1156421242

   cc: @ijuma (since you seem have performed scala upgrades in the past)
   
   The test failures do not seem related to this code change to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12286: KAFKA-13984: Fix TopicBasedRemoteLogMetadataManager#initializeResources should exit immediately when partition size of __remote_log_metad

2022-06-15 Thread GitBox


divijvaidya commented on PR #12286:
URL: https://github.com/apache/kafka/pull/12286#issuecomment-1156415549

   @bozhao12 can you please add a unit test in 
`TopicBasedRemoteLogMetadataManagerTest` that fails before this change and 
succeeds after this change. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…

2022-06-15 Thread GitBox


cadonna commented on PR #10881:
URL: https://github.com/apache/kafka/pull/10881#issuecomment-1156400720

   @wycc Do you plan to still work on this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…

2022-06-15 Thread GitBox


cadonna commented on PR #10881:
URL: https://github.com/apache/kafka/pull/10881#issuecomment-1156398967

   @clolov Thank you for your interest and help! Since this PR was not touched 
more than half a year, I would be fine to close it and you can open a new one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12285: KAFKA-7342 Part 1: Straightforward JUnit4 to JUnit5 migrations

2022-06-15 Thread GitBox


cadonna commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1156391414

   @clolov Thank you for the PR!
   I agree with @divijvaidya about doing the reformatting in a separate PR. 
Could you also try to subdivide the PR into smaller PRs? Reviewing a 6500 line 
PR is never fun. Sizes around 500 are acceptable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-06-15 Thread GitBox


jnh5y commented on PR #12161:
URL: https://github.com/apache/kafka/pull/12161#issuecomment-1156385644

   > 
   
   
   
   > @jnh5y Thank you for the updates!
   > 
   > LGTM!
   > 
   > Had just one nit.
   > 
   > Thank you for your patience!
   
   @cadonna Thank you for pushing me and helping me learn more about streams!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-06-15 Thread GitBox


jnh5y commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r897893883


##
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.KeyValue.pair;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({IntegrationTest.class})
+public class PauseResumeIntegrationTest {
+private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+private static Properties producerConfig;
+private static Properties consumerConfig;
+
+private static final Materialized> IN_MEMORY_STORE =
+Materialized.as(Stores.inMemoryKeyValueStore("store"));
+
+private static final String INPUT_STREAM_1 = "input-stream-1";
+private static final String INPUT_STREAM_2 = "input-stream-2";
+private static final String OUTPUT_STREAM_1 = "output-stream-1";
+private static final String OUTPUT_STREAM_2 = "output-stream-2";
+private static final String TOPOLOGY1 = "topology1";
+private static final String TOPOLOGY2 = "topology2";
+
+private static final List> STANDARD_INPUT_DATA =
+asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
+private static final List> COUNT_OUTPUT_DATA =
+asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), 
pair("C", 2L));
+private 

[GitHub] [kafka] cadonna commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-06-15 Thread GitBox


cadonna commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r897887371


##
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.KeyValue.pair;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({IntegrationTest.class})
+public class PauseResumeIntegrationTest {
+private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+private static Properties producerConfig;
+private static Properties consumerConfig;
+
+private static final Materialized> IN_MEMORY_STORE =
+Materialized.as(Stores.inMemoryKeyValueStore("store"));
+
+private static final String INPUT_STREAM_1 = "input-stream-1";
+private static final String INPUT_STREAM_2 = "input-stream-2";
+private static final String OUTPUT_STREAM_1 = "output-stream-1";
+private static final String OUTPUT_STREAM_2 = "output-stream-2";
+private static final String TOPOLOGY1 = "topology1";
+private static final String TOPOLOGY2 = "topology2";
+
+private static final List> STANDARD_INPUT_DATA =
+asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
+private static final List> COUNT_OUTPUT_DATA =
+asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), 
pair("C", 2L));
+private 

[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-06-15 Thread GitBox


jnh5y commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r897869162


##
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##
@@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws 
Exception {
 kafkaStreams.resume();
 waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
 
-awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+}
+
+@Test
+public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);

Review Comment:
   My mistake; I've updated the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-06-15 Thread GitBox


jnh5y commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r897868472


##
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##
@@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws 
Exception {
 kafkaStreams.resume();
 waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
 
-awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+}
+
+@Test
+public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams.start();
+kafkaStreams2.start();
+
+waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), 
State.RUNNING, STARTUP_TIMEOUT);
+
+awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+
+kafkaStreams.close();
+kafkaStreams2.close();
+
+kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams.cleanUp();
+kafkaStreams2.cleanUp();
+
+kafkaStreams.pause();
+kafkaStreams2.pause();
+kafkaStreams.start();
+kafkaStreams2.start();
+
+waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), 
State.REBALANCING, STARTUP_TIMEOUT);
+
+assertTrue(kafkaStreams.allLocalStorePartitionLags().isEmpty());
+assertTrue(kafkaStreams2.allLocalStorePartitionLags().isEmpty());

Review Comment:
   Thank you!  I've added these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12230: MINOR: Catch InvocationTargetException explicitly and propagate underlying cause

2022-06-15 Thread GitBox


divijvaidya commented on PR #12230:
URL: https://github.com/apache/kafka/pull/12230#issuecomment-1156355538

   @dengziming @showuon please review this small change. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12229: MINOR: Include the inner exception stack trace when re-throwing an exception

2022-06-15 Thread GitBox


divijvaidya commented on PR #12229:
URL: https://github.com/apache/kafka/pull/12229#issuecomment-1156352282

   @mimaison perhaps you may want to look into this? This already has 2 
approvals from non-committers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13386) Foreign Key Join filtering out valid records after a code change / schema evolved

2022-06-15 Thread Kin Siu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554526#comment-17554526
 ] 

Kin Siu commented on KAFKA-13386:
-

[~guozhang], I am not sure if what I faced is same as [~sduran], but I hit an 
issue while load testing my application which contains FK join, there can be no 
output until the very last Left hand side record being processed.
 
Simplified test data in my load testing :
|| - || Key Fields || Value Fields ||
| Left hand side table | K1 | FK1, DF1, UpdateTs |
| Right hand side table | FK1 | V1, V2 |

It is a simple FK join of Left hand side table and Right hand side table on 
field "FK1", for each of the Left hand side update, I changed the DF1 and 
UpdateTs.
When we increase the Left hand side table publishing rate, at some point when 
the value is high, application started not generating any output, until the 
last left hand side record being processed. Think of the case that I ran the 
test for 10 mins, we can end up with only receiving output on the last few 
seconds. And instead of having same number of join output as the Left hand side 
updates + Right hand side updates, we received a lot less.

I belive that it is the same issue as described above, the "hash" comparison 
when processing the right hand side return is compared with latest Left hand 
side hash, while in my test data, the FK relation remain the same, the "hash" 
changed due to value of "DF1" and "UpdateTs" changed.


> Foreign Key Join filtering out valid records after a code change / schema 
> evolved
> -
>
> Key: KAFKA-13386
> URL: https://issues.apache.org/jira/browse/KAFKA-13386
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Sergio Duran Vegas
>Priority: Major
>
> The join optimization assumes the serializer is deterministic and invariant 
> across upgrades. So in case of changes this opimitzation will drop 
> invalid/intermediate records. In other situations we have relied on the same 
> property, for example when computing whether an update is a duplicate result 
> or not.
>  
> The problem is that some serializers are sadly not deterministic.
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]
>  
> {code:java}
> //If this value doesn't match the current value from the original table, it 
> is stale and should be discarded.
>  if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
>  
> A solution for this problem would be that the comparison use foreign-key 
> reference itself instead of the whole message hash.
>  
> The bug fix proposal is to be allow the user to choose between one method of 
> comparison or another (whole hash or Fk reference). This would fix the 
> problem of dropping valid records on certain cases and allow the user to also 
> choose the current optimized way of checking valid records and intermediate 
> results dropping.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] divijvaidya commented on pull request #12229: MINOR: Include the inner exception stack trace when re-throwing an exception

2022-06-15 Thread GitBox


divijvaidya commented on PR #12229:
URL: https://github.com/apache/kafka/pull/12229#issuecomment-1156349743

   > Can this kind of problem be caught by spotbugs? manual checking is error 
prone.
   
   
   
   > Can this kind of problem be caught by spotbugs? manual checking is error 
prone.
   
   
   
   > Can this kind of problem be caught by spotbugs? manual checking is error 
prone.
   
   Agreed @dengziming but unfortunately spotbugs isn't catching such errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12229: MINOR: Include the inner exception stack trace when re-throwing an exception

2022-06-15 Thread GitBox


divijvaidya commented on PR #12229:
URL: https://github.com/apache/kafka/pull/12229#issuecomment-1156349744

   > Can this kind of problem be caught by spotbugs? manual checking is error 
prone.
   
   
   
   > Can this kind of problem be caught by spotbugs? manual checking is error 
prone.
   
   
   
   > Can this kind of problem be caught by spotbugs? manual checking is error 
prone.
   
   Agreed @dengziming but unfortunately spotbugs isn't catching such errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-06-15 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r897838652


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -34,35 +35,56 @@
  */
 public abstract class SampledStat implements MeasurableStat {
 
-private double initialValue;
+private final double initialValue;
+/**
+ * Index of the latest stored sample.
+ */
 private int current = 0;
+/**
+ * Stores the recorded samples in a ring buffer.
+ */
 protected List samples;
 
 public SampledStat(double initialValue) {
 this.initialValue = initialValue;
 this.samples = new ArrayList<>(2);
 }
 
+/**
+ * {@inheritDoc}
+ *
+ * On every record, do the following:
+ * 1. Check if the current window has expired
+ * 2. If yes, then advance the current pointer to new window. The start 
time of the new window is set to nearest
+ *possible starting point for the new window. The nearest starting 
point occurs at config.timeWindowMs intervals
+ *from the end time of last known window.
+ * 3. Update the recorded value for the current window
+ * 4. Increase the number of event count
+ */
 @Override
-public void record(MetricConfig config, double value, long timeMs) {
-Sample sample = current(timeMs);
-if (sample.isComplete(timeMs, config))
-sample = advance(config, timeMs);
-update(sample, config, value, timeMs);
-sample.eventCount += 1;
+public void record(MetricConfig config, double value, long 
recordingTimeMs) {
+Sample sample = current(recordingTimeMs);
+if (sample.isComplete(recordingTimeMs, config)) {
+final long previousWindowStartTime = sample.lastWindowMs;
+final long previousWindowEndtime = previousWindowStartTime + 
config.timeWindowMs();
+final long startTimeOfNewWindow = recordingTimeMs - 
((recordingTimeMs - previousWindowEndtime) % config.timeWindowMs());

Review Comment:
   That is a great observation Tom! Ideally the code should be written to 
ensure that recording a metric should not block because the operation is wall 
clock time sensitive. But as you observed, we have `synchronized` at multiple 
places which may lead to sample being recorded in a window which has already 
completed in the past.
   
   For cases when the `sensor` is used for calculating the ConnectionQuota, 
this problem wouldn't occur because the calculation of `Time.milliseconds` is 
done inside a `synchronised` block which ensures that ensures that only one 
thread with latest timestamp will be accessing the sensor.record at a time.
   
   But I don't know about other code paths other than ConnectionQuota that use 
sensor and your observation is valid. 
   
   Since this problem is independent of this code change, and breaks existing 
logic if/when recordingTimeMs < endTimeOfPreviousWindow, I have created a JIRA 
to address this in a separate PR: 
https://issues.apache.org/jira/browse/KAFKA-13994
   
   [1] 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L1541-L1542
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13994) Incorrect quota calculation due to a bug

2022-06-15 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-13994:


 Summary: Incorrect quota calculation due to a bug
 Key: KAFKA-13994
 URL: https://issues.apache.org/jira/browse/KAFKA-13994
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Divij Vaidya


*Problem*
This was noted by [~tombentley] at 
[https://github.com/apache/kafka/pull/12045#discussion_r895592286] 

The completion of a sample window in `SampledStat.java` is based on comparison 
of `

recordingTimeMs` with startTimeOfPreviousWindow [1]. `recordingTimeMs` is 
calculated as a System.currentTimeMillis which:
1. is not guaranteed to be monotonically increasing due to clock drifts. 
2. is not necessarily the current time when it arrives at [1] because the 
thread may be blocked at `synchronized` at {{{}Sensor.recordInternal [2]{}}}, 
because synchronized provides no guarantee about fairness for blocked threads.

Hence, it is possible that when isComplete comparison is made at [1], 
recordingTimeMs < endTimeOfCurrentWindow whereas the wallClockTimeAtTheMoment > 
 startTimeOfCurrentWindow + window length.

The implication of this would be:
1. The current sample window will not be considered completed even if it has 
completed as per wall clock time.
2. The value will be recorded in a sample window which has elapsed instead of a 
new window where it belongs.

Due to the above two implications, the metrics captured by the sensor may not 
be correct which could lead to incorrect quota calculations.

 [1] 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java#L138]
 

 [2] 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java#L232



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] divijvaidya commented on pull request #12184: KAFKA-13911: Fix the rate window size calculation for edge cases

2022-06-15 Thread GitBox


divijvaidya commented on PR #12184:
URL: https://github.com/apache/kafka/pull/12184#issuecomment-1156329800

   @dajac @guozhangwang please review when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-06-15 Thread GitBox


cadonna commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r897835619


##
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##
@@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws 
Exception {
 kafkaStreams.resume();
 waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
 
-awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+}
+
+@Test
+public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams.start();
+kafkaStreams2.start();
+
+waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), 
State.RUNNING, STARTUP_TIMEOUT);
+
+awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+
+kafkaStreams.close();
+kafkaStreams2.close();
+
+kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams.cleanUp();
+kafkaStreams2.cleanUp();
+
+kafkaStreams.pause();
+kafkaStreams2.pause();
+kafkaStreams.start();
+kafkaStreams2.start();
+
+waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), 
State.REBALANCING, STARTUP_TIMEOUT);
+
+assertTrue(kafkaStreams.allLocalStorePartitionLags().isEmpty());
+assertTrue(kafkaStreams2.allLocalStorePartitionLags().isEmpty());

Review Comment:
   You could do something like:
   ```
   waitForApplicationState(Arrays.asList(kafkaStreams), 
State.REBALANCING, STARTUP_TIMEOUT);
   waitForCondition(
   () -> !kafkaStreams.allLocalStorePartitionLags().isEmpty(),
   "Lags for local store partitions were not found within the 
timeout!");
   waitUntilStreamsHasPolled(kafkaStreams, 2);
   final long stateStoreLag1 = 
kafkaStreams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag();
   waitUntilStreamsHasPolled(kafkaStreams, 2);
   final long stateStoreLag2 = 
kafkaStreams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag();
   assertTrue(stateStoreLag1 > 0);
   assertEquals(stateStoreLag1, stateStoreLag2);
   ```
   This code just considers one Streams client. You need to add 
`Materialized.as("test-store")` to the call to `count()` in your topology.
   As soon as you activated the standbys, you need to do the same for the 
second Streams client. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tombentley commented on a diff in pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)

2022-06-15 Thread GitBox


tombentley commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r897830233


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -274,18 +276,23 @@ public static NewTopicBuilder defineTopic(String 
topicName) {
  * @param adminConfig the configuration for the {@link Admin}
  */
 public TopicAdmin(Map adminConfig) {
-this(adminConfig, Admin.create(adminConfig));
+this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), 
Admin.create(adminConfig));
 }
 
-// visible for testing
-TopicAdmin(Map adminConfig, Admin adminClient) {
-this(adminConfig, adminClient, true);
+/**
+ * Create a new topic admin using the provided {@link Admin}
+ *
+ * @param bootstrapServers the Kafka cluster targeted by the admin
+ * @param adminClient the {@link Admin} to use under the hood
+ */
+public TopicAdmin(Object bootstrapServers, Admin adminClient) {

Review Comment:
   This constructor kinda confuses the ownership of the `Admin` client. I think 
things are cleaner when the TopicAdmin instantiates (and thus owns the 
`admin`). Note, it looks like there are no callers for `TopicAdmin.admin`.  It 
seems that the call sites in `doBuild` could simply pass the map of configs 
(and the `bootstrapServers` looked up from that), rather than instantiating the 
`admin` and then passing it to the TopicAdmin.
   
   Obviously the test code has slightly different requirements, meaning we 
still need this constructor. I did also wonder whether we could also get rid of 
`bootstrapServers` by defining `toString` on `KafkaAdminClient` and using that 
for the logging and exceptions here in `TopicAdmin`. Perhaps that's worth a 
followup PR at some point, (though perhaps there are benefits to hiding 
bootstrap servers from receivers of clients).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-06-15 Thread GitBox


cadonna commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r897818350


##
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##
@@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws 
Exception {
 kafkaStreams.resume();
 waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
 
-awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+}
+
+@Test
+public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams.start();
+kafkaStreams2.start();
+
+waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), 
State.RUNNING, STARTUP_TIMEOUT);
+
+awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+
+kafkaStreams.close();
+kafkaStreams2.close();
+
+kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams.cleanUp();
+kafkaStreams2.cleanUp();
+
+kafkaStreams.pause();
+kafkaStreams2.pause();
+kafkaStreams.start();
+kafkaStreams2.start();
+
+waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), 
State.REBALANCING, STARTUP_TIMEOUT);
+
+assertTrue(kafkaStreams.allLocalStorePartitionLags().isEmpty());
+assertTrue(kafkaStreams2.allLocalStorePartitionLags().isEmpty());

Review Comment:
   I played a bit around with the test and indeed if you add a 
`Thread.sleep(2000)` before these asserts, the test fails because the returned 
map is not empty. That means, the assignment was not finished before the 
asserts were called. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13993) Large log.cleaner.buffer.size config breaks Kafka Broker

2022-06-15 Thread Tomohiro Hashidate (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomohiro Hashidate updated KAFKA-13993:
---
Description: 
LogCleaner builds a Cleaner instance in the following way.

 

{color:#cc7832}val {color}{color:#9876aa}cleaner {color}= {color:#cc7832}new 
{color}Cleaner(id = threadId{color:#cc7832},{color}{color:#cc7832} 
{color}offsetMap = {color:#cc7832}new {color}SkimpyOffsetMap(memory = 
math.min({color:#9876aa}config{color}.dedupeBufferSize / 
{color:#9876aa}config{color}.numThreads{color:#cc7832}, 
Int{color}.{color:#9876aa}MaxValue{color}).toInt{color:#cc7832},{color}{color:#cc7832}
 {color}hashAlgorithm = 
{color:#9876aa}config{color}.hashAlgorithm){color:#cc7832},{color}{color:#cc7832}
 {color}ioBufferSize = {color:#9876aa}config{color}.ioBufferSize / 
{color:#9876aa}config{color}.numThreads / 
{color:#6897bb}2{color}{color:#cc7832},{color}{color:#cc7832} 
{color}maxIoBufferSize = 
{color:#9876aa}config{color}.maxMessageSize{color:#cc7832},{color}{color:#cc7832}
 {color}dupBufferLoadFactor = 
{color:#9876aa}config{color}.dedupeBufferLoadFactor{color:#cc7832},{color}{color:#cc7832}
 {color}throttler = 
{color:#9876aa}throttler{color}{color:#cc7832},{color}{color:#cc7832} 
{color}time = time{color:#cc7832},{color}{color:#cc7832} {color}checkDone = 
checkDone)

 

If `log.cleaner.buffer.size` / `log.cleaner.threads` is larger than 
Int.MaxValue, SkimpyOffsetMap uses Int.MaxValue.

And SkimpyOffsetMap tries to allocate ByteBuffer that has Int.MaxValue capacity.

But, in the implementation of Hotspot VM, the maximum array size is 
Int.MaxValue - 5.

According to ArraysSupport in OpenJDK, SOFT_MAX_ARRAY_LENGTH is Int.MaxValue - 
8 (This is more safety).

https://github.com/openjdk/jdk17u/blob/master/src/java.base/share/classes/jdk/internal/util/ArraysSupport.java#L589

 

If ByteBuffer capacity exceeds the maximum array length, Kafka Broker failed to 
start.

 

```

[2022-06-14 18:08:09,609] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:45)
        at kafka.log.LogCleaner$CleanerThread.(LogCleaner.scala:300)
        at kafka.log.LogCleaner.$anonfun$startup$2(LogCleaner.scala:155)
        at kafka.log.LogCleaner.startup(LogCleaner.scala:154)
        at kafka.log.LogManager.startup(LogManager.scala:435)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:291)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)

```

 

I suggest using `Int.MaxValue - 8`instead of `Int.MaxValue`.

  was:
LogCleaner builds a Cleaner instance in the following way.

 

{color:#cc7832}val {color}{color:#9876aa}cleaner {color}= {color:#cc7832}new 
{color}Cleaner(id = threadId{color:#cc7832},{color}{color:#cc7832} 
{color}offsetMap = {color:#cc7832}new {color}SkimpyOffsetMap(memory = 
math.min({color:#9876aa}config{color}.dedupeBufferSize / 
{color:#9876aa}config{color}.numThreads{color:#cc7832}, 
Int{color}.{color:#9876aa}MaxValue{color}).toInt{color:#cc7832},{color}{color:#cc7832}
 {color}hashAlgorithm = 
{color:#9876aa}config{color}.hashAlgorithm){color:#cc7832},{color}{color:#cc7832}
 {color}ioBufferSize = {color:#9876aa}config{color}.ioBufferSize / 
{color:#9876aa}config{color}.numThreads / 
{color:#6897bb}2{color}{color:#cc7832},{color}{color:#cc7832} 
{color}maxIoBufferSize = 
{color:#9876aa}config{color}.maxMessageSize{color:#cc7832},{color}{color:#cc7832}
 {color}dupBufferLoadFactor = 
{color:#9876aa}config{color}.dedupeBufferLoadFactor{color:#cc7832},{color}{color:#cc7832}
 {color}throttler = 
{color:#9876aa}throttler{color}{color:#cc7832},{color}{color:#cc7832} 
{color}time = time{color:#cc7832},{color}{color:#cc7832} {color}checkDone = 
checkDone)

 

If `log.cleaner.buffer.size` / `log.cleaner.threads` is larger than 
Int.MaxValue, SkimpyOffsetMap uses Int.MaxValue.

And SkimpyOffsetMap tries to allocate ByteBuffer that has Int.MaxValue capacity.

But, in the implementation of Hotspot VM, the maximum array size is 
Int.MaxValue - 5.

According to ArraysSupport in OpenJDK, SOFT_MAX_ARRAY_LENGTH is Int.MaxValue - 
8 (This is more safety).

 

If ByteBuffer capacity exceeds the maximum array length, Kafka Broker failed to 
start.

 

```

[2022-06-14 18:08:09,609] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at 

[GitHub] [kafka] tombentley commented on a diff in pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)

2022-06-15 Thread GitBox


tombentley commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r897812219


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1327,30 +1334,39 @@ public WorkerTask doBuild(Task task,
 connectorClientConfigOverridePolicy, kafkaClusterId);
 KafkaProducer producer = new 
KafkaProducer<>(producerProps);
 
-TopicAdmin topicAdmin;
+// Prepare to create a topic admin if the task requires one, but 
do not actually create an instance
+// until/unless one is needed
+final AtomicReference topicAdmin = new 
AtomicReference<>();
+final Supplier topicAdminCreator = () -> 
topicAdmin.updateAndGet(existingAdmin -> {
+if (existingAdmin != null) {
+return existingAdmin;
+}
+Map adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+Admin adminClient = Admin.create(adminOverrides);
+return new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+});
+
 Map topicCreationGroups;
 if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
 topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
 // Create a topic admin that the task can use for topic 
creation
-Map adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
-sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
-topicAdmin = new TopicAdmin(adminOverrides);
+topicAdminCreator.get();

Review Comment:
   Thanks! When you have time it would be great if you could rebase that PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13993) Large log.cleaner.buffer.size config breaks Kafka Broker

2022-06-15 Thread Tomohiro Hashidate (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomohiro Hashidate updated KAFKA-13993:
---
Description: 
LogCleaner builds a Cleaner instance in the following way.

 

{color:#cc7832}val {color}{color:#9876aa}cleaner {color}= {color:#cc7832}new 
{color}Cleaner(id = threadId{color:#cc7832},{color}{color:#cc7832} 
{color}offsetMap = {color:#cc7832}new {color}SkimpyOffsetMap(memory = 
math.min({color:#9876aa}config{color}.dedupeBufferSize / 
{color:#9876aa}config{color}.numThreads{color:#cc7832}, 
Int{color}.{color:#9876aa}MaxValue{color}).toInt{color:#cc7832},{color}{color:#cc7832}
 {color}hashAlgorithm = 
{color:#9876aa}config{color}.hashAlgorithm){color:#cc7832},{color}{color:#cc7832}
 {color}ioBufferSize = {color:#9876aa}config{color}.ioBufferSize / 
{color:#9876aa}config{color}.numThreads / 
{color:#6897bb}2{color}{color:#cc7832},{color}{color:#cc7832} 
{color}maxIoBufferSize = 
{color:#9876aa}config{color}.maxMessageSize{color:#cc7832},{color}{color:#cc7832}
 {color}dupBufferLoadFactor = 
{color:#9876aa}config{color}.dedupeBufferLoadFactor{color:#cc7832},{color}{color:#cc7832}
 {color}throttler = 
{color:#9876aa}throttler{color}{color:#cc7832},{color}{color:#cc7832} 
{color}time = time{color:#cc7832},{color}{color:#cc7832} {color}checkDone = 
checkDone)

 

If `log.cleaner.buffer.size` / `log.cleaner.threads` is larger than 
Int.MaxValue, SkimpyOffsetMap uses Int.MaxValue.

And SkimpyOffsetMap tries to allocate ByteBuffer that has Int.MaxValue capacity.

But, in the implementation of Hotspot VM, the maximum array size is 
Int.MaxValue - 5.

According to ArraysSupport in OpenJDK, SOFT_MAX_ARRAY_LENGTH is Int.MaxValue - 
8 (This is more safety).

 

If ByteBuffer capacity exceeds the maximum array length, Kafka Broker failed to 
start.

 

```

[2022-06-14 18:08:09,609] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:45)
        at kafka.log.LogCleaner$CleanerThread.(LogCleaner.scala:300)
        at kafka.log.LogCleaner.$anonfun$startup$2(LogCleaner.scala:155)
        at kafka.log.LogCleaner.startup(LogCleaner.scala:154)
        at kafka.log.LogManager.startup(LogManager.scala:435)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:291)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)

```

 

I suggest using `Int.MaxValue - 8`instead of `Int.MaxValue`.

  was:
LogCleaner build a Cleaner instance with following way.

 

```

{color:#cc7832}val {color}{color:#9876aa}cleaner {color}= {color:#cc7832}new 
{color}Cleaner(id = threadId{color:#cc7832},
{color}{color:#cc7832} {color}offsetMap = {color:#cc7832}new 
{color}SkimpyOffsetMap(memory = 
math.min({color:#9876aa}config{color}.dedupeBufferSize / 
{color:#9876aa}config{color}.numThreads{color:#cc7832}, 
Int{color}.{color:#9876aa}MaxValue{color}).toInt{color:#cc7832},
{color}{color:#cc7832} {color}hashAlgorithm = 
{color:#9876aa}config{color}.hashAlgorithm){color:#cc7832},
{color}{color:#cc7832} {color}ioBufferSize = 
{color:#9876aa}config{color}.ioBufferSize / 
{color:#9876aa}config{color}.numThreads / 
{color:#6897bb}2{color}{color:#cc7832},
{color}{color:#cc7832} {color}maxIoBufferSize = 
{color:#9876aa}config{color}.maxMessageSize{color:#cc7832},
{color}{color:#cc7832} {color}dupBufferLoadFactor = 
{color:#9876aa}config{color}.dedupeBufferLoadFactor{color:#cc7832},
{color}{color:#cc7832} {color}throttler = 
{color:#9876aa}throttler{color}{color:#cc7832},
{color}{color:#cc7832} {color}time = time{color:#cc7832},
{color}{color:#cc7832} {color}checkDone = checkDone)

```

If `log.cleaner.buffer.size` / `log.cleaner.threads` is larger than 
Int.MaxValue, SkimpyOffsetMap uses Int.MaxValue.

And SkimpyOffsetMap try to allocates ByteBuffer that has Int.MaxValue capacity.

But, in the implmentation of Hotspot VM, the maximum array size is Int.MaxValue 
- 5.

Accoring to ArraysSupport in OpenJDK, SOFT_MAX_ARRAY_LENGTH is Int.MaxValue - 8 
(This is more safety).

 

If ByteBuffer capacity exceeds the maximum array length, Kafka Broker failed to 
start.

 

```

[2022-06-14 18:08:09,609] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:45)
        at kafka.log.LogCleaner$CleanerThread.(LogCleaner.scala:300)
       

[jira] [Created] (KAFKA-13993) Large log.cleaner.buffer.size config breaks Kafka Broker

2022-06-15 Thread Tomohiro Hashidate (Jira)
Tomohiro Hashidate created KAFKA-13993:
--

 Summary: Large log.cleaner.buffer.size config breaks Kafka Broker
 Key: KAFKA-13993
 URL: https://issues.apache.org/jira/browse/KAFKA-13993
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.1.1, 3.2.0, 3.0.1, 2.8.1, 2.7.2
Reporter: Tomohiro Hashidate


LogCleaner build a Cleaner instance with following way.

 

```

{color:#cc7832}val {color}{color:#9876aa}cleaner {color}= {color:#cc7832}new 
{color}Cleaner(id = threadId{color:#cc7832},
{color}{color:#cc7832} {color}offsetMap = {color:#cc7832}new 
{color}SkimpyOffsetMap(memory = 
math.min({color:#9876aa}config{color}.dedupeBufferSize / 
{color:#9876aa}config{color}.numThreads{color:#cc7832}, 
Int{color}.{color:#9876aa}MaxValue{color}).toInt{color:#cc7832},
{color}{color:#cc7832} {color}hashAlgorithm = 
{color:#9876aa}config{color}.hashAlgorithm){color:#cc7832},
{color}{color:#cc7832} {color}ioBufferSize = 
{color:#9876aa}config{color}.ioBufferSize / 
{color:#9876aa}config{color}.numThreads / 
{color:#6897bb}2{color}{color:#cc7832},
{color}{color:#cc7832} {color}maxIoBufferSize = 
{color:#9876aa}config{color}.maxMessageSize{color:#cc7832},
{color}{color:#cc7832} {color}dupBufferLoadFactor = 
{color:#9876aa}config{color}.dedupeBufferLoadFactor{color:#cc7832},
{color}{color:#cc7832} {color}throttler = 
{color:#9876aa}throttler{color}{color:#cc7832},
{color}{color:#cc7832} {color}time = time{color:#cc7832},
{color}{color:#cc7832} {color}checkDone = checkDone)

```

If `log.cleaner.buffer.size` / `log.cleaner.threads` is larger than 
Int.MaxValue, SkimpyOffsetMap uses Int.MaxValue.

And SkimpyOffsetMap try to allocates ByteBuffer that has Int.MaxValue capacity.

But, in the implmentation of Hotspot VM, the maximum array size is Int.MaxValue 
- 5.

Accoring to ArraysSupport in OpenJDK, SOFT_MAX_ARRAY_LENGTH is Int.MaxValue - 8 
(This is more safety).

 

If ByteBuffer capacity exceeds the maximum array length, Kafka Broker failed to 
start.

 

```

[2022-06-14 18:08:09,609] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:45)
        at kafka.log.LogCleaner$CleanerThread.(LogCleaner.scala:300)
        at kafka.log.LogCleaner.$anonfun$startup$2(LogCleaner.scala:155)
        at kafka.log.LogCleaner.startup(LogCleaner.scala:154)
        at kafka.log.LogManager.startup(LogManager.scala:435)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:291)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)

```

 

I suggest to use `Int.MaxValue - 8`instead of `Int.MaxValue`.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cadonna commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies

2022-06-15 Thread GitBox


cadonna commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r897789473


##
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##
@@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws 
Exception {
 kafkaStreams.resume();
 waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
 
-awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+}
+
+@Test
+public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams.start();
+kafkaStreams2.start();
+
+waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), 
State.RUNNING, STARTUP_TIMEOUT);
+
+awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+
+kafkaStreams.close();
+kafkaStreams2.close();
+
+kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams.cleanUp();
+kafkaStreams2.cleanUp();
+
+kafkaStreams.pause();
+kafkaStreams2.pause();
+kafkaStreams.start();
+kafkaStreams2.start();
+
+waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), 
State.REBALANCING, STARTUP_TIMEOUT);
+
+assertTrue(kafkaStreams.allLocalStorePartitionLags().isEmpty());
+assertTrue(kafkaStreams2.allLocalStorePartitionLags().isEmpty());

Review Comment:
   Why are you verifying for emptiness? I would expect that there are entries 
for the state stores with a lag greater than 0.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -479,6 +485,47 @@ public void restore(final Map tasks) {
 }
 }
 
+private void updateStandbyPartitions(final Map tasks,

Review Comment:
   Do not forget to rename this method to something more meaningful. 
   Proposal: `pauseResumePartitions()`



##
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##
@@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws 
Exception {
 kafkaStreams.resume();
 waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
 
-awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA);
+awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
+}
+
+@Test
+public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
+produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
+
+kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
+kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1);

Review Comment:
   If you do not use standby tasks, there is no reason to use two Kafka Streams 
clients. I would propose to use one standby only for this test. For that you 
need to set `num.standby.replicas` to 1. That has the effect that one client 
gets the active store assigned and the other gets the standby store assigned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16

2022-06-15 Thread GitBox


mdedetrich commented on PR #12284:
URL: https://github.com/apache/kafka/pull/12284#issuecomment-1156251953

   @divijvaidya Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16

2022-06-15 Thread GitBox


divijvaidya commented on PR #12284:
URL: https://github.com/apache/kafka/pull/12284#issuecomment-1156237309

   @mdedetrich could you please re-run the tests (by pushing another commit or 
by rebasing from trunk & force pushing). It would be ideal if we have a clean 
test run (with known flaky failures) before we approve this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-06-15 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r897722547


##
clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java:
##
@@ -608,14 +609,14 @@ public void testRateWindowing() throws Exception {
 time.sleep(cfg.timeWindowMs() / 2);
 
 // prior to any time passing
-double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + 
cfg.timeWindowMs() / 2) / 1000.0;
+double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + 
(((double) cfg.timeWindowMs()) / 2.0d)) / 1000.0d;

Review Comment:
   Thanks for catching this. I have fixed this in the latest revision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-06-15 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r897733191


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -34,35 +35,56 @@
  */
 public abstract class SampledStat implements MeasurableStat {
 
-private double initialValue;
+private final double initialValue;
+/**
+ * Index of the latest stored sample.
+ */
 private int current = 0;
+/**
+ * Stores the recorded samples in a ring buffer.

Review Comment:
   That sounds fair. I have fixed the java doc in the latest revision as per 
your suggestion.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-06-15 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r897733191


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -34,35 +35,56 @@
  */
 public abstract class SampledStat implements MeasurableStat {
 
-private double initialValue;
+private final double initialValue;
+/**
+ * Index of the latest stored sample.
+ */
 private int current = 0;
+/**
+ * Stores the recorded samples in a ring buffer.

Review Comment:
   That sounds fair. I have fixed the java doc in the latest revision and 
replaced with the following
   ```
   /**
* Stores the recorded samples.
* Note that the previously recorded samples may be overwritten/reset if 
they are considered obsolete by the
* {@link Sample#purgeObsoleteSamples} function.
*/
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-06-15 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r897724648


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -84,13 +106,7 @@ public Sample current(long timeMs) {
 public Sample oldest(long now) {
 if (samples.size() == 0)
 this.samples.add(newSample(now));
-Sample oldest = this.samples.get(0);
-for (int i = 1; i < this.samples.size(); i++) {
-Sample curr = this.samples.get(i);
-if (curr.lastWindowMs < oldest.lastWindowMs)
-oldest = curr;
-}
-return oldest;
+return samples.stream().min(Comparator.comparingLong(s -> 
s.lastWindowMs)).orElse(samples.get(0));

Review Comment:
   I find the new code more readable since we can immediately eye ball that a 
min is being calculated vs. in the previous version where we have to understand 
the assignments and logic in for loop to determine what is going on. 
   
   Nevertheless, I don't have strong opinion on this one. If you still think we 
need to revert it back, I will do it. Let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-06-15 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r897724922


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java:
##
@@ -68,24 +68,55 @@ public double measure(MetricConfig config, long now) {
 }
 
 public long windowSize(MetricConfig config, long now) {
-// purge old samples before we compute the window size
+// Purge obsolete samples. Obsolete samples are the ones which are not 
relevant to the current calculation
+// because their creation time is outside (before) the duration of 
time window used to calculate rate.
 stat.purgeObsoleteSamples(config, now);
 
 /*
  * Here we check the total amount of time elapsed since the oldest 
non-obsolete window.
- * This give the total windowSize of the batch which is the time used 
for Rate computation.
- * However, there is an issue if we do not have sufficient data for 
e.g. if only 1 second has elapsed in a 30 second
- * window, the measured rate will be very high.
- * Hence we assume that the elapsed time is always N-1 complete 
windows plus whatever fraction of the final window is complete.
+ * This gives the duration of computation time window which used to 
calculate Rate.

Review Comment:
   Thanks for catching this. I have fixed this in the latest revision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-06-15 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r897721779


##
clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java:
##
@@ -149,13 +149,14 @@ private void verifyStats(Function 
metricValueFunc) {
 
 assertEquals(5.0, 
metricValueFunc.apply(metrics.metric(metrics.metricName("s2.total", "grp1"))), 
EPS,
 "s2 reflects the constant value");
-assertEquals(4.5, 
metricValueFunc.apply(metrics.metric(metrics.metricName("test.avg", "grp1"))), 
EPS,
+assertEquals(sum / (double) count, 
metricValueFunc.apply(metrics.metric(metrics.metricName("test.avg", "grp1"))), 
EPS,
 "Avg(0...9) = 4.5");
 assertEquals(count - 1,  
metricValueFunc.apply(metrics.metric(metrics.metricName("test.max", "grp1"))), 
EPS,
 "Max(0...9) = 9");
 assertEquals(0.0, 
metricValueFunc.apply(metrics.metric(metrics.metricName("test.min", "grp1"))), 
EPS,
 "Min(0...9) = 0");
-assertEquals(sum / elapsedSecs, 
metricValueFunc.apply(metrics.metric(metrics.metricName("test.rate", "grp1"))), 
EPS,
+// rate is calculated over the first ever window. Hence, we assume 
presence of prior windows with 0 recorded events.
+assertEquals((double) sum / elapsedSecs, 
metricValueFunc.apply(metrics.metric(metrics.metricName("test.rate", "grp1"))), 
EPS,

Review Comment:
   Thanks for catching this. I have fixed this in the latest revision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fred-ro commented on pull request #12201: MINOR: Replace left single quote with single quote in Connect worker's log message

2022-06-15 Thread GitBox


fred-ro commented on PR #12201:
URL: https://github.com/apache/kafka/pull/12201#issuecomment-1156048902

   Is it possible to back ported it to branch 2.8 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tkaszuba commented on a diff in pull request #12293: KAFKA-13963: Clarified java doc for processors api

2022-06-15 Thread GitBox


tkaszuba commented on code in PR #12293:
URL: https://github.com/apache/kafka/pull/12293#discussion_r897565836


##
streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java:
##
@@ -30,6 +30,7 @@
  * In contrast, two sub-topologies are not connected but can be linked to each 
other via topics, i.e., if one
  * sub-topology {@link Topology#addSink(String, String, String...) writes} 
into a topic and another sub-topology
  * {@link Topology#addSource(String, String...) reads} from the same topic.
+ * Processors and Transformers created with the Processor API are treated as 
black boxes and are not represented in the topology graph.

Review Comment:
   that is correct, the issue is with context.forward



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org