[GitHub] [kafka] chia7712 commented on pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-25 Thread GitBox


chia7712 commented on pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#issuecomment-699440096


   @hachikuji the ```docs/upgrade.html``` is updated :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jacky1193610322 commented on pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-25 Thread GitBox


jacky1193610322 commented on pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#issuecomment-699438281


   I got a thought, there is a scenario that A leader can see its followers, 
but cannot see Zookeeper, and then the leader will be fenced when it attempts 
to shink isr or expand isr because it holds the leaderIsrUpdateLock, but now 
the leader can't be fenced because it just sends the message and will process 
message normally. [click 
me](https://jack-vanlightly.com/blog/2018/9/2/rabbitmq-vs-kafka-part-6-fault-tolerance-and-high-availability-with-kafka)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MicahRam commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-25 Thread GitBox


MicahRam commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-699394946


   @vvcephei I rebased. Checks are failing as far as I can tell it's wasn't me 
and they've been failing. Let me know if you think otherwise.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nym3r0s edited a comment on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-25 Thread GitBox


nym3r0s edited a comment on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-699364431


   > 
   > 
   > @ableegoldman I agree it would be useful to have a test case. If you have 
time, maybe you can help out in a follow-up?
   
   @hachikuji @ableegoldman - I'm so sorry about the delay. I'm still getting 
up to speed with the codebase, so any tips would be great. Thank you!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-25 Thread GitBox


nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-699364431


   > 
   > 
   > @ableegoldman I agree it would be useful to have a test case. If you have 
time, maybe you can help out in a follow-up?
   
   @hachikuji @ableegoldman - I'm so sorry about the delay. I'm a bit confused 
on whether to add the test on the `Sender` or the `RecordAccumulator`. I'm 
still getting up to speed with the codebase, so any tips would be great. Thank 
you!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #9295: MINOR: standardize rebalance related logging for easy discovery & debugging

2020-09-25 Thread GitBox


guozhangwang merged pull request #9295:
URL: https://github.com/apache/kafka/pull/9295


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ahuang98 opened a new pull request #9340: Improving Fetch Session Caching for KAFKA-9401

2020-09-25 Thread GitBox


ahuang98 opened a new pull request #9340:
URL: https://github.com/apache/kafka/pull/9340


   WIP - Does not follow original cache eviction logic of waiting `evictionMs` 
before allowing smaller session to be evicted by a larger session. Working on 
reverting those changes. All WIP comments below pertain to this issue. 
   
   ## Summary of changes 
    @lbradstreet
   - JMH benchmark to log performance improvements
   - making `EvictableKey`'s `compareTo` method faster
   - restructuring of `tryEvict` (WIP to make sure this works with original 
cache eviction logic)
   - restructuring of `touch`
   - `evictableBy` treemaps changed to store unprivileged and privileged 
sessions respectively (WIP, will change this)
   
    ahuang98
- `sessions` turned into LinkedHashMap (from mutable.HashMap) to absorb 
function of `lastUsed`
   - entries in `sessions` are now ordered by the time the session was last 
touched
-  `evictableBy` treemaps renamed to `unprivilegedSessions` and 
`privilegedSesions` (WIP, will need to revert this)
   
   ## Testing
- Unit test
   - assertion in `testSessionCache` currently commented out due to 
misunderstanding of cache eviction behavior (WIP, will uncomment out and add 
additional tests)
- JMH benchmark
   - running benchmark w/ 8 warmup iterations and 6 measurement iterations 
on an ec2 instance shows performance of optimized code with a cache size set at 
5000 entries is comparable if not slightly better than performance of trunk 
with a cache size of 1000. (note, cache eviction logic is slightly different in 
the optimized code though)
   
   | cacheSize | percentPrivileged | benchPrivileged | newSession | 
numEvictableEntries | cacheUtilization | Mode | Cnt | sessions_lastUsed Score | 
sessions_lastUsed Error | trunk Score | trunk Error | Units |
   | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | 
--- |
   | 1000 | 10 | FALSE | FALSE | 0 | 99 | avgt | 6 | 2552.391 | 107.575 | 
4221.632 | 359.275 | ns/op |
   | 1000 | 10 | FALSE | FALSE | 1 | 99 | avgt | 6 | 2582.457 | 125.423 | 
4222.194 | 414.809 | ns/op |
   | 1000 | 10 | FALSE | FALSE | 10 | 99 | avgt | 6 | 2746.227 | 61.092 | 
4251.687 | 370.021 | ns/op |
   | 1000 | 10 | FALSE | TRUE | 0 | 99 | avgt | 6 | 1198.266 | 222.24 | 2252.08 
| 702.901 | ns/op |
   | 1000 | 10 | FALSE | TRUE | 1 | 99 | avgt | 6 | 1134.612 | 120.705 | 
2169.681 | 550.987 | ns/op |
   | 1000 | 10 | FALSE | TRUE | 10 | 99 | avgt | 6 | 1179.589 | 218.907 | 
2103.325 | 353.927 | ns/op |
   | 1000 | 10 | FALSE | FALSE | 0 | 100 | avgt | 6 | 2622.648 | 180.46 | 
4298.698 | 461.962 | ns/op |
   | 1000 | 10 | FALSE | FALSE | 1 | 100 | avgt | 6 | 2610.549 | 167.135 | 
4189.434 | 304.004 | ns/op |
   | 1000 | 10 | FALSE | FALSE | 10 | 100 | avgt | 6 | 2606.701 | 95.392 | 
4290.71 | 393.693 | ns/op |
   | 1000 | 10 | FALSE | TRUE | 0 | 100 | avgt | 6 | 1997.809 | 143.441 | 
3520.044 | 357.352 | ns/op |
   | 1000 | 10 | FALSE | TRUE | 1 | 100 | avgt | 6 | 2009.162 | 174.531 | 
3697.102 | 1342.944 | ns/op |
   | 1000 | 10 | FALSE | TRUE | 10 | 100 | avgt | 6 | 2057.212 | 114.299 | 
3473.398 | 691.091 | ns/op |
   | 2000 | 10 | FALSE | FALSE | 0 | 99 | avgt | 6 | 3719.148 | 104.879 | 
5398.167 | 136.906 | ns/op |
   | 2000 | 10 | FALSE | FALSE | 1 | 99 | avgt | 6 | 3487.926 | 220.468 | 
5371.3 | 121.94 | ns/op |
   | 2000 | 10 | FALSE | FALSE | 10 | 99 | avgt | 6 | 3582.789 | 49.977 | 
5233.165 | 80.441 | ns/op |
   | 2000 | 10 | FALSE | TRUE | 0 | 99 | avgt | 6 | 1674.242 | 24.652 | 
2897.423 | 38.511 | ns/op |
   | 2000 | 10 | FALSE | TRUE | 1 | 99 | avgt | 6 | 1666.169 | 113.422 | 
2704.126 | 19.803 | ns/op |
   | 2000 | 10 | FALSE | TRUE | 10 | 99 | avgt | 6 | 1696.892 | 155.846 | 
2793.346 | 38.972 | ns/op |
   | 2000 | 10 | FALSE | FALSE | 0 | 100 | avgt | 6 | 3466.104 | 47.457 | 
5497.131 | 57.95 | ns/op |
   | 2000 | 10 | FALSE | FALSE | 1 | 100 | avgt | 6 | 3447.317 | 57.066 | 
5519.515 | 104.217 | ns/op |
   | 2000 | 10 | FALSE | FALSE | 10 | 100 | avgt | 6 | 3619.742 | 49.741 | 
5247.048 | 77.843 | ns/op |
   | 2000 | 10 | FALSE | TRUE | 0 | 100 | avgt | 6 | 2741.251 | 66.38 | 
4432.045 | 40.222 | ns/op |
   | 2000 | 10 | FALSE | TRUE | 1 | 100 | avgt | 6 | 2783.172 | 43.712 | 
4537.006 | 664.541 | ns/op |
   | 2000 | 10 | FALSE | TRUE | 10 | 100 | avgt | 6 | 2787.124 | 59.319 | 
4589.346 | 604.493 | ns/op |
   | 5000 | 10 | FALSE | FALSE | 0 | 99 | avgt | 6 | 4044.472 | 49.122 | 
6333.033 | 222.079 | ns/op |
   | 5000 | 10 | FALSE | FALSE | 1 | 99 | avgt | 6 | 3908.32 | 83.414 | 7172.89 
| 3360.043 | ns/op |
   | 5000 | 10 | FALSE | FALSE | 10 | 99 | avgt | 6 | 4101.308 | 98.899 | 
6056.719 | 305.08 | ns/op |
   | 5000 | 10 | FALSE | TRUE | 0 | 99 | avgt | 6 | 1768.217 | 34.502 | 
3160.962 | 167.346 | ns/op |
   | 5000 | 10 | FALSE | TRUE | 1 | 99 | avgt | 6 | 1860.036 | 63.331 | 
2924.341 | 99.126 | ns/op |
   | 5000 | 10 | FALSE | TRUE | 10 | 99 | avgt | 6 | 1857.753 | 21.139 | 
3

[GitHub] [kafka] ableegoldman opened a new pull request #9339: MINOR: add docs for 2.7 TRACE-level e2e latency metrics

2020-09-25 Thread GitBox


ableegoldman opened a new pull request #9339:
URL: https://github.com/apache/kafka/pull/9339


   Add the extended e2e latency metrics to the upgrade guide for 2.7.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9339: MINOR: add docs for 2.7 TRACE-level e2e latency metrics

2020-09-25 Thread GitBox


ableegoldman commented on pull request #9339:
URL: https://github.com/apache/kafka/pull/9339#issuecomment-699225846


   @vvcephei @mjsax @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9295: MINOR: standardize rebalance related logging for easy discovery & debugging

2020-09-25 Thread GitBox


ableegoldman commented on a change in pull request #9295:
URL: https://github.com/apache/kafka/pull/9295#discussion_r495292893



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -769,24 +771,27 @@ public void handle(SyncGroupResponse syncResponse,
 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
 
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-log.debug("SyncGroup failed because the group began 
another rebalance");
+log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
+ "Sent generation was {}", sentGeneration);
 future.raise(error);
 } else if (error == Errors.FENCED_INSTANCE_ID) {
 // for sync-group request, even if the generation has 
changed we would not expect the instance id
 // gets fenced, and hence we always treat this as a fatal 
error
-log.error("SyncGroup with {} failed because the group 
instance id {} has been fenced by another instance",
-sentGeneration, rebalanceConfig.groupInstanceId);
+log.error("SyncGroup failed: The group instance id {} has 
been fenced by another instance. " +
+"Sent generation was {}", 
rebalanceConfig.groupInstanceId, sentGeneration);
 future.raise(error);
 } else if (error == Errors.UNKNOWN_MEMBER_ID
 || error == Errors.ILLEGAL_GENERATION) {
-log.info("SyncGroup with {} failed: {}, would request 
re-join", sentGeneration, error.message());
+log.info("SyncGroup failed: {} Need to re-join the group. 
Sent generation was {}",
+error.message(), sentGeneration);
 if (generationUnchanged())
 resetGenerationOnResponseError(ApiKeys.SYNC_GROUP, 
error);
 
 future.raise(error);
 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
 || error == Errors.NOT_COORDINATOR) {
-log.debug("SyncGroup failed: {}, marking coordinator 
unknown", error.message());
+log.debug("SyncGroup failed: {} Marking coordinator 
unknown. Sent generation was {}",

Review comment:
   Sounds good





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3

2020-09-25 Thread GitBox


nizhikov commented on pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#issuecomment-699186863


   Hello, @guozhangwang 
   
   I ran a failed test locally and it passes.
   Test passed in my previous runs, also.
   You can take a look at the report I share in ticket - 
https://issues.apache.org/jira/secure/attachment/13010366/report.txt
   Guess, the test is flaky.
   
   Can you, please, advice me, what is the next step?
   For now, this PR based on ducktape master branch.
   
   Do we need a ducktape release for it?
   I don't receive much feedback from ducktape maintainers - 
https://github.com/confluentinc/ducktape/issues/245
   
   ```
   [1:15:45]~/work/kafka/tests:[KAFKA-10402]$ ./docker/run_tests.sh
   
   > Configure project :
   Building project 'core' with Scala version 2.13.3
   Building project 'streams-scala' with Scala version 2.13.3
   
   Deprecated Gradle features were used in this build, making it incompatible 
with Gradle 7.0.
   Use '--warning-mode all' to show the individual deprecation warnings.
   See 
https://docs.gradle.org/6.6/userguide/command_line_interface.html#sec:command_line_warnings
   
   BUILD SUCCESSFUL in 2s
   119 actionable tasks: 3 executed, 116 up-to-date
   docker exec ducker01 bash -c "cd /opt/kafka-dev && ducktape --cluster-file 
/opt/kafka-dev/tests/docker/build/cluster.json  
./tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py::StreamsBrokerDownResilience.test_streams_resilient_to_broker_down"
   [INFO:2020-09-25 15:15:53,329]: starting test run with session id 
2020-09-25--001...
   [INFO:2020-09-25 15:15:53,329]: running 1 tests...
   [INFO:2020-09-25 15:15:53,330]: Triggering test 1 of 1...
   [INFO:2020-09-25 15:15:53,360]: RunnerClient: Loading test {'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/streams', 'file_name': 
'streams_broker_down_resilience_test.py', 'cls_name': 
'StreamsBrokerDownResilience', 'method_name': 
'test_streams_resilient_to_broker_down', 'injected_args': None}
   [INFO:2020-09-25 15:15:53,386]: RunnerClient: 
kafkatest.tests.streams.streams_broker_down_resilience_test.StreamsBrokerDownResilience.test_streams_resilient_to_broker_down:
 Setting up...
   [INFO:2020-09-25 15:15:59,198]: RunnerClient: 
kafkatest.tests.streams.streams_broker_down_resilience_test.StreamsBrokerDownResilience.test_streams_resilient_to_broker_down:
 Running...
   [INFO:2020-09-25 15:18:05,820]: RunnerClient: 
kafkatest.tests.streams.streams_broker_down_resilience_test.StreamsBrokerDownResilience.test_streams_resilient_to_broker_down:
 PASS
   [INFO:2020-09-25 15:18:05,821]: RunnerClient: 
kafkatest.tests.streams.streams_broker_down_resilience_test.StreamsBrokerDownResilience.test_streams_resilient_to_broker_down:
 Tearing down...
   [INFO:2020-09-25 15:18:18,192]: RunnerClient: 
kafkatest.tests.streams.streams_broker_down_resilience_test.StreamsBrokerDownResilience.test_streams_resilient_to_broker_down:
 Summary: 
   [INFO:2020-09-25 15:18:18,194]: RunnerClient: 
kafkatest.tests.streams.streams_broker_down_resilience_test.StreamsBrokerDownResilience.test_streams_resilient_to_broker_down:
 Data: None
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.0
   session_id:   2020-09-25--001
   run time: 2 minutes 24.908 seconds
   tests run:1
   passed:   1
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.streams.streams_broker_down_resilience_test.StreamsBrokerDownResilience.test_streams_resilient_to_broker_down
   status: PASS
   run time:   2 minutes 24.806 seconds
   

   ```
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-25 Thread GitBox


mjsax merged pull request #9156:
URL: https://github.com/apache/kafka/pull/9156


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-25 Thread GitBox


mjsax commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-699151989


   Thanks for the patch @big-andy-coates!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Deleted] (KAFKA-7590) GETTING HUGE MESSAGE STRUCTURE THROUGH JMS CONNECTOR

2020-09-25 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao deleted KAFKA-7590:
---


> GETTING HUGE MESSAGE STRUCTURE THROUGH JMS CONNECTOR
> 
>
> Key: KAFKA-7590
> URL: https://issues.apache.org/jira/browse/KAFKA-7590
> Project: Kafka
>  Issue Type: Test
>Reporter: Chenchu Lakshman kumar
>Priority: Major
>
> Message



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10470) zstd decompression with small batches is slow and causes excessive GC

2020-09-25 Thread James Yuzawa (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17202351#comment-17202351
 ] 

James Yuzawa edited comment on KAFKA-10470 at 9/25/20, 6:36 PM:


I also noticed the large amount of allocations and GC activity in my profiling. 
However, there is an additional issue related to the the number of calls Kafka 
does to ZstdOutputStream.write(int). Each of these single byte writes gets sent 
to the JNI for compression. I think an input buffer could improve this, by only 
crossing over into the JNI code when a critical mass of input has been 
accumulated. Option 1: We could wrap the ZstdOutputStream with a 
BufferedOutputStream like how it is done for GZIP currently. Option 2: Alter 
the library to use buffering. I have this ticket open with the zstd-jni project 
[https://github.com/luben/zstd-jni/issues/141]


was (Author: yuzawa-san):
I also noticed the lack of buffer reuse in my profiling. However, there is an 
additional issue related to the the number of calls Kafka does to 
ZstdOutputStream.write(int). Each of these single byte writes gets sent to the 
JNI for compression. I think an input buffer could improve this, by only 
crossing over into the JNI code when a critical mass of input has been 
accumulated. Option 1: We could wrap the ZstdOutputStream with a 
BufferedOutputStream like how it is done for GZIP currently. Option 2: the 
library could be updated. I have this ticket open with the zstd-jni project 
[https://github.com/luben/zstd-jni/issues/141]

> zstd decompression with small batches is slow and causes excessive GC
> -
>
> Key: KAFKA-10470
> URL: https://issues.apache.org/jira/browse/KAFKA-10470
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.1
>Reporter: Robert Wagner
>Priority: Major
>
> Similar to KAFKA-5150 but for zstd instead of LZ4, it appears that a large 
> decompression buffer (128kb) created by zstd-jni per batch is causing a 
> significant performance bottleneck.
> The next upcoming version of zstd-jni (1.4.5-7) will have a new constructor 
> for ZstdInputStream that allows the client to pass its own buffer.  A similar 
> fix as [PR #2967|https://github.com/apache/kafka/pull/2967] could be used to 
> have the  ZstdConstructor use a BufferSupplier to re-use the decompression 
> buffer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10470) zstd decompression with small batches is slow and causes excessive GC

2020-09-25 Thread James Yuzawa (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17202351#comment-17202351
 ] 

James Yuzawa commented on KAFKA-10470:
--

I also noticed the lack of buffer reuse in my profiling. However, there is an 
additional issue related to the the number of calls Kafka does to 
ZstdOutputStream.write(int). Each of these single byte writes gets sent to the 
JNI for compression. I think an input buffer could improve this, by only 
crossing over into the JNI code when a critical mass of input has been 
accumulated. Option 1: We could wrap the ZstdOutputStream with a 
BufferedOutputStream like how it is done for GZIP currently. Option 2: the 
library could be updated. I have this ticket open with the zstd-jni project 
[https://github.com/luben/zstd-jni/issues/141]

> zstd decompression with small batches is slow and causes excessive GC
> -
>
> Key: KAFKA-10470
> URL: https://issues.apache.org/jira/browse/KAFKA-10470
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.1
>Reporter: Robert Wagner
>Priority: Major
>
> Similar to KAFKA-5150 but for zstd instead of LZ4, it appears that a large 
> decompression buffer (128kb) created by zstd-jni per batch is causing a 
> significant performance bottleneck.
> The next upcoming version of zstd-jni (1.4.5-7) will have a new constructor 
> for ZstdInputStream that allows the client to pass its own buffer.  A similar 
> fix as [PR #2967|https://github.com/apache/kafka/pull/2967] could be used to 
> have the  ZstdConstructor use a BufferSupplier to re-use the decompression 
> buffer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7590) GETTING HUGE MESSAGE STRUCTURE THROUGH JMS CONNECTOR

2020-09-25 Thread Addison Huddy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Addison Huddy updated KAFKA-7590:
-
Description: Message  (was: Message
 

{"schema":{"type":"struct","fields":[

{"type":"string","optional":false,"doc":"This field stores the value of 
`Message.getJMSMessageID() 
`_.","field":"messageID"}

,

{"type":"string","optional":false,"doc":"This field stores the type of message 
that was received. This corresponds to the subinterfaces of `Message 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html]>`_. `BytesMessage 
<[http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html]>`_ = 
`bytes`, `MapMessage 
<[http://docs.oracle.com/javaee/6/api/javax/jms/MapMessage.html]>`_ = `map`, 
`ObjectMessage 
<[http://docs.oracle.com/javaee/6/api/javax/jms/ObjectMessage.html]>`_ = 
`object`, `StreamMessage 
<[http://docs.oracle.com/javaee/6/api/javax/jms/StreamMessage.html]>`_ = 
`stream` and `TextMessage 
<[http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html]>`_ = `text`. 
The corresponding field will be populated with the values from the respective 
Message subinterface.","field":"messageType"}

,

{"type":"int64","optional":false,"doc":"Data from the `getJMSTimestamp() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSTimestamp(])>`_
 method.","field":"timestamp"}

,

{"type":"int32","optional":false,"doc":"This field stores the value of 
`Message.getJMSDeliveryMode() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSDeliveryMode(])>`_.","field":"deliveryMode"}

,

{"type":"string","optional":true,"doc":"This field stores the value of 
`Message.getJMSCorrelationID() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSCorrelationID(])>`_.","field":"correlationID"}

,{"type":"struct","fields":[

{"type":"string","optional":false,"doc":"The type of JMS Destination, and 
either ``queue`` or ``topic``.","field":"destinationType"}

,

{"type":"string","optional":false,"doc":"The name of the destination. This will 
be the value of `Queue.getQueueName() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html#getQueueName(])>`_ 
or `Topic.getTopicName() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html#getTopicName(])>`_.","field":"name"}

],"optional":true,"name":"io.confluent.connect.jms.Destination","doc":"This 
schema is used to represent a JMS Destination, and is either `queue 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html]>`_ or `topic 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html]>`_.","field":"replyTo"},{"type":"struct","fields":[

{"type":"string","optional":false,"doc":"The type of JMS Destination, and 
either ``queue`` or ``topic``.","field":"destinationType"}

,

{"type":"string","optional":false,"doc":"The name of the destination. This will 
be the value of `Queue.getQueueName() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html#getQueueName(])>`_ 
or `Topic.getTopicName() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html#getTopicName(])>`_.","field":"name"}

],"optional":true,"name":"io.confluent.connect.jms.Destination","doc":"This 
schema is used to represent a JMS Destination, and is either `queue 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html]>`_ or `topic 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html]>`_.","field":"destination"},

{"type":"boolean","optional":false,"doc":"This field stores the value of 
`Message.getJMSRedelivered() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSRedelivered(])>`_.","field":"redelivered"}

,

{"type":"string","optional":true,"doc":"This field stores the value of 
`Message.getJMSType() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSType(])>`_.","field":"type"}

,

{"type":"int64","optional":false,"doc":"This field stores the value of 
`Message.getJMSExpiration() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSExpiration(])>`_.","field":"expiration"}

,

{"type":"int32","optional":false,"doc":"This field stores the value of 
`Message.getJMSPriority() 
<[http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSPriority(])>`_.","field":"priority"}

,{"type":"map","keys":

{"type":"string","optional":false}

,"values":{"type":"struct","fields":[

{"type":"string","optional":false,"doc":"The java type of the property on the 
Message. One of ``boolean``, ``byte``, ``short``, ``integer``, ``long``, 
``float``, ``double``, or ``string``.","field":"propertyType"}

,\{"type":"boolean","optional":true,"doc":"The value stored as a boolean. Null 
unless ``propertyType`` is set to 
``boolean``.","field":"boolean"},\{"type":"int8","optional":true,"doc":"The 
value stored as a byte. Null unless ``propertyType``

[GitHub] [kafka] hachikuji commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-25 Thread GitBox


hachikuji commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-699081923


   @ableegoldman I agree it would be useful to have a test case. If you have 
time, maybe you can help out in a follow-up?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.

2020-09-25 Thread GitBox


ableegoldman commented on pull request #9312:
URL: https://github.com/apache/kafka/pull/9312#issuecomment-699059908


   @guozhangwang I think this needs to be cherrypicked back to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3

2020-09-25 Thread GitBox


guozhangwang commented on pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#issuecomment-699034902


   Only one test failed this time, and honestly I'm not sure if it is related 
to your PR or not, here you go:
   
   ```
   
   Module: kafkatest.tests.streams.streams_broker_down_resilience_test Class:  
StreamsBrokerDownResilience Method: test_streams_resilient_to_broker_down | 
This test validates that Streams is resilient to a broker being down longer 
than specified timeouts in configs | 3 minutes 56.586 seconds
   -- | -- | --
   
   
   ```
   
   ```
   [INFO  - 2020-09-25 06:34:38,507 - kafka - bootstrap_servers - lineno:1007]: 
Bootstrap client port is: 9092
   [INFO  - 2020-09-25 06:36:38,847 - runner_client - log - lineno:240]: 
RunnerClient: 
kafkatest.tests.streams.streams_broker_down_resilience_test.StreamsBrokerDownResilience.test_streams_resilient_to_broker_down:
 FAIL: Never saw output 'Discovered group coordinator' on ubuntu@worker3
   Traceback (most recent call last):
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.9-py2.7.egg/ducktape/tests/runner_client.py",
 line 134, in run
   data = self.run_test()
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.9-py2.7.egg/ducktape/tests/runner_client.py",
 line 192, in run_test
   return self.test_context.function(self.test)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py",
 line 68, in test_streams_resilient_to_broker_down
   err_msg=("Never saw output '%s' on " % self.connected_message) + 
str(processor.node.account))
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.9-py2.7.egg/ducktape/cluster/remoteaccount.py",
 line 708, in wait_until
   allow_fail=True) == 0, **kwargs)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.9-py2.7.egg/ducktape/utils/util.py",
 line 41, in wait_until
   raise TimeoutError(err_msg() if callable(err_msg) else err_msg)
   TimeoutError: Never saw output 'Discovered group coordinator' on 
ubuntu@worker3
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-25 Thread GitBox


dajac commented on pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#issuecomment-699026162


   @chia7712 Sorry for the delay... I will take another look on Monday.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-25 Thread GitBox


chia7712 commented on pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#issuecomment-699025599


   > To clarify, were you going to mention this in docs/upgrade.html?
   
   will copy that



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-25 Thread GitBox


chia7712 commented on pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#issuecomment-699025146


   @dajac @hachikuji Could you take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #8897: MINOR; Use the automated protocol for the Consumer Protocol's subscriptions and assignments

2020-09-25 Thread GitBox


hachikuji merged pull request #8897:
URL: https://github.com/apache/kafka/pull/8897


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8897: MINOR; Use the automated protocol for the Consumer Protocol's subscriptions and assignments

2020-09-25 Thread GitBox


hachikuji commented on pull request #8897:
URL: https://github.com/apache/kafka/pull/8897#issuecomment-699022589


   Two flaky tests: 
   - https://issues.apache.org/jira/browse/KAFKA-10405
   - https://issues.apache.org/jira/browse/KAFKA-8266



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji edited a comment on pull request #8897: MINOR; Use the automated protocol for the Consumer Protocol's subscriptions and assignments

2020-09-25 Thread GitBox


hachikuji edited a comment on pull request #8897:
URL: https://github.com/apache/kafka/pull/8897#issuecomment-699022589


   Two flaky test failures: 
   - https://issues.apache.org/jira/browse/KAFKA-10405
   - https://issues.apache.org/jira/browse/KAFKA-8266



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10519) Unit tests for VotedState

2020-09-25 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10519.
-
Resolution: Fixed

> Unit tests for VotedState
> -
>
> Key: KAFKA-10519
> URL: https://issues.apache.org/jira/browse/KAFKA-10519
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> We accidentally checked in an empty test class `VotedStateTest`. We should 
> add missing unit tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nizhikov commented on pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3

2020-09-25 Thread GitBox


nizhikov commented on pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#issuecomment-699018740


   Hello, @guozhangwang 
   
   Can you, please, share the results of the tests run?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9337: KAFKA-10519; Add missing unit test for `VotedState`

2020-09-25 Thread GitBox


hachikuji merged pull request #9337:
URL: https://github.com/apache/kafka/pull/9337


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-25 Thread GitBox


hachikuji commented on pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#issuecomment-699017504


   @chia7712 To clarify, were you going to mention this in `docs/upgrade.html`?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10528) Make output of AK various tools consistent in case of errors

2020-09-25 Thread David Jacot (Jira)
David Jacot created KAFKA-10528:
---

 Summary: Make output of AK various tools consistent in case of 
errors
 Key: KAFKA-10528
 URL: https://issues.apache.org/jira/browse/KAFKA-10528
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot


AK has various command line tools. I have noticed that they are not all 
consistent when it comes to how they treat and print out errors. Typically, the 
message of the exception is printed out followed by its stacktrace.

I have noticed couple of things:
* Sometimes the stacktrace is printed out using the logger in some tools or 
with println in others. Also, I wonder if providing the stacktrace to the users 
is a good idea at all as it gives the feeling that the tool did not work even 
though the error may be legitimate.
* Some tools unwrap ExecutionException to provide the message of the real cause 
to the user. Others don't.
* The handling of errors resulting from bad or wrong flags is also inconsistent 
in certain tools. We should review this and be consistent.

There may be other things to look after. I think that we should review all the 
tools and make them consistent.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #9302: KAFKA-10149: Allow auto preferred leader election when partitions are reassigning

2020-09-25 Thread GitBox


dajac commented on a change in pull request #9302:
URL: https://github.com/apache/kafka/pull/9302#discussion_r495068017



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1067,10 +1067,7 @@ class KafkaController(val config: KafkaConfig,
   // check ratio and if greater than desired ratio, trigger a rebalance 
for the topic partitions
   // that need to be on this broker
   if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble 
/ 100)) {
-// do this check only if the broker is live and there are no 
partitions being reassigned currently
-// and preferred replica election is not in progress
 val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp =>
-  controllerContext.partitionsBeingReassigned.isEmpty &&

Review comment:
   I totally agree. The sooner the better. I was wondering if there could 
be any conflicts with the reassignment logic. I have looked at it and I haven't 
seen anything. 👍 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10364) Use Token Bucket for all quotas

2020-09-25 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-10364:

Description: KIP-599 has introduced the Token Bucket algorithm to throttle 
controller mutations. We should consider using it for all quotas in AK in the 
future.

> Use Token Bucket for all quotas
> ---
>
> Key: KAFKA-10364
> URL: https://issues.apache.org/jira/browse/KAFKA-10364
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> KIP-599 has introduced the Token Bucket algorithm to throttle controller 
> mutations. We should consider using it for all quotas in AK in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10364) Use Token Bucket for all quotas

2020-09-25 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-10364:

Parent: (was: KAFKA-9915)
Issue Type: Improvement  (was: Sub-task)

> Use Token Bucket for all quotas
> ---
>
> Key: KAFKA-10364
> URL: https://issues.apache.org/jira/browse/KAFKA-10364
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] scanterog commented on pull request #9313: [mm2] Fix consumer/producer properties override

2020-09-25 Thread GitBox


scanterog commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-698983392


   @mimaison just added it. Let me know if there's anything else to fix. Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10516) Implement Topic Command changes

2020-09-25 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-10516.
-
Resolution: Fixed

> Implement Topic Command changes
> ---
>
> Key: KAFKA-10516
> URL: https://issues.apache.org/jira/browse/KAFKA-10516
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10516) Implement Topic Command changes

2020-09-25 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17202224#comment-17202224
 ] 

David Jacot commented on KAFKA-10516:
-

Done by this PR: https://github.com/apache/kafka/pull/9334

> Implement Topic Command changes
> ---
>
> Key: KAFKA-10516
> URL: https://issues.apache.org/jira/browse/KAFKA-10516
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10516) Implement Topic Command changes

2020-09-25 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-10516:

Fix Version/s: 2.7.0

> Implement Topic Command changes
> ---
>
> Key: KAFKA-10516
> URL: https://issues.apache.org/jira/browse/KAFKA-10516
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram merged pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

2020-09-25 Thread GitBox


rajinisivaram merged pull request #9334:
URL: https://github.com/apache/kafka/pull/9334


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

2020-09-25 Thread GitBox


dajac commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494810769



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
 assertEquals(2, rows.size)
 rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+val adminClient = Mockito.mock(classOf[Admin])
+val topicService = AdminClientTopicService(adminClient)
+
+val result = AdminClientTestUtils.createTopicsResult(testTopicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), 
ArgumentMatchers.any())).thenReturn(result)

Review comment:
   That makes sense. I had to introduce an alias for `ArgumentMatchers.eq` 
to not conflict with `eq`. I went with `eqThat` to remain inline with `argThat`.

##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
   else if (opts.hasDeleteOption)
 topicService.deleteTopic(opts)
 } catch {
+  case e: ExecutionException =>
+if (e.getCause != null)
+  printException(e.getCause)
+else
+  printException(e)
+exitCode = 1
   case e: Throwable =>
-println("Error while executing topic command : " + e.getMessage)
-error(Utils.stackTrace(e))
+printException(e)
 exitCode = 1
 } finally {
   topicService.close()
   Exit.exit(exitCode)
 }
   }
 
+  private def printException(e: Throwable): Unit = {
+println("Error while executing topic command : " + e.getMessage)
+error(Utils.stackTrace(e))

Review comment:
   Do you mean using `error(message, e)` to replace both `println` and 
`error`? I think that we are using 'println` here in order to print the message 
to stdout without any logger related stuff and regardless of how the logger is 
configured. Changing to using `error(message, e)` would break this and 
potentially break existing application due to introducing the logger related 
stuff for that message. I think that we should keep `println` here.
   
   However, I wonder if using `error` is appropriate here as it basically 
prints a stacktrace for every errors. The UX does not look good. Would it make 
sense to use `debug` instead? The message of the exception is printed anyway 
and I don't think that the stacktrace provides much to regular users. WDYT?
   
   

##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
   else if (opts.hasDeleteOption)
 topicService.deleteTopic(opts)
 } catch {
+  case e: ExecutionException =>
+if (e.getCause != null)
+  printException(e.getCause)
+else
+  printException(e)
+exitCode = 1
   case e: Throwable =>
-println("Error while executing topic command : " + e.getMessage)
-error(Utils.stackTrace(e))
+printException(e)
 exitCode = 1
 } finally {
   topicService.close()
   Exit.exit(exitCode)
 }
   }
 
+  private def printException(e: Throwable): Unit = {
+println("Error while executing topic command : " + e.getMessage)
+error(Utils.stackTrace(e))

Review comment:
   That's a very good question. I guess that we did so in order to just 
print out the stacktrace without the message. Otherwise, we would have the 
error message printed out by the `println`, followed by the same message 
printed out by logger, followed by the stacktrace. Having the message twice is 
not necessary. I had a quick look at other commands and we do so everywhere.
   
   I will open a JIRA to make tools more consistent. Good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

2020-09-25 Thread GitBox


dajac commented on pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#issuecomment-698784923


   @rajinisivaram Thanks for your comments. I have updated the PR and answered 
your questions.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-25 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-698835897


   @ijuma any chance you could trigger a CI build? Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fredsh2k closed pull request #9335: Update quickstart.html

2020-09-25 Thread GitBox


fredsh2k closed pull request #9335:
URL: https://github.com/apache/kafka/pull/9335


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8897: MINOR; Use the automated protocol for the Consumer Protocol's subscriptions and assignments

2020-09-25 Thread GitBox


dajac commented on pull request #8897:
URL: https://github.com/apache/kafka/pull/8897#issuecomment-698811607


   @hachikuji Thanks for your suggestions. I have incorporated them.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fredsh2k commented on pull request #9335: Update quickstart.html

2020-09-25 Thread GitBox


fredsh2k commented on pull request #9335:
URL: https://github.com/apache/kafka/pull/9335#issuecomment-698870391


   kafka_2.12-2.4.0
   
   ok, thanks for the help :) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9328: Minor: Add deleteDir for streams quickstart test

2020-09-25 Thread GitBox


cadonna commented on pull request #9328:
URL: https://github.com/apache/kafka/pull/9328#issuecomment-698151176


   @mumrah, error message
   
   ```
   [2020-09-24T01:46:33.204Z] Cleaning up test-streams-archetype
   No such property: deleteDir for class: groovy.lang.Binding
   ```
   in build 
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-9328/2/pipeline/8
   
   seems related.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-25 Thread GitBox


ijuma commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r494182608



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)

Review comment:
   Any update regarding this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on pull request #9281: KAFKA-10478: Allow duplicated ports in advertised.listeners

2020-09-25 Thread GitBox


asdaraujo commented on pull request #9281:
URL: https://github.com/apache/kafka/pull/9281#issuecomment-698644188


   > Please also update the documentation, I think it's worth to note that in 
case of `listeners` it's not allowed but for `advertised.listeners` it is for 
the LB use-case.
   > Otherwise it looks good.
   
   Thanks, @viktorsomogyi .  I've updated the documentation as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-25 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494480847



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1764,6 +1769,145 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics.forEach { topicReq =>
+  topicReq.partitions.forEach { partitionReq =>
+val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+  }
+}
+
+def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code)
+case Left(partitionResults) =>
+  resp.setTopics(new util.ArrayList())
+  partitionResults
+.groupBy { case (tp, _) => tp.topic }   // Group by topic
+.foreach { case (topic, partitions) =>
+  // Add each topic part to the response
+  val topicResp = new AlterIsrResponseData.TopicData()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  resp.topics.add(topicResp)
+  partitions.foreach { case (tp, errorOrIsr) =>
+// Add each partition part to the response (new ISR or error)
+errorOrIsr match {
+  case Left(error) => topicResp.partitions.add(
+new AlterIsrResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  .setErrorCode(error.code))
+  case Right(leaderAndIsr) => topicResp.partitions.add(
+new AlterIsrResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  .setLeaderId(leaderAndIsr.leader)
+  .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+  .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+}
+}
+  }
+  }
+  callback.apply(resp)
+}
+
+eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+  callback: AlterIsrCallback): Unit = {
+
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Right(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+if (!brokerEpochOpt.contains(brokerEpoch)) {
+  info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+val response = try {
+  val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+
+  // Determine which partitions we will accept the new ISR for
+  val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+  val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+case Some(leaderIsrAndControllerEpoch) =>
+  val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+  if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {

Review comment:
   Consider the following scenario:
   
   1) broker sends AlterIsr
   2) the update succeeds but the response is lost
   3) broker retries AlterIsr
   
   Currently the leader will be stuck after 3) because it has no way to get the 
latest LeaderAndIsr state if the first attempt fails. To handle this, I think 
we need to add an idempotence check here. After we have validated the leader 
epoch, if the intended state matches the current state, then we can just return 
the current state. 

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
+val isrToSendOpt: Option[Set[Int]

[GitHub] [kafka] chia7712 commented on pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-25 Thread GitBox


chia7712 commented on pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#issuecomment-698083970


   ```
   Build / JDK 11 / 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
   ```
   it is flaky on trunk branch also so it is unrelated to this patch



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-25 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r494170995



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [

Review comment:
   Done. I have added a top-level error code now.

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. This operation is not 
transactional so it
+ * may succeed for some features while fail for others.
+ * 
+ * The API takes in a map of finalized feature name to {@link 
FeatureUpdate} that needs to be
+ * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+ * deleted, along with the new max feature version level value. This 
request is issued only to
+ * the controller since the API is only served by the controller. The 
return value contains an
+ * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+ * succeeded or failed in the controller.
+ * 
+ * Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+ * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+ * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+ * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+ * controller if it is deemed impossible.
+ * Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+ * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+ * the max version level to be less than 1.
+ * 
+ *
+ * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+ * obtained from the returned {@link UpdateFeaturesResult}:
+ * 
+ *   {@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+ *   If the authenticated user didn't have alter access to the 
cluster.
+ *   {@link org.apache.kafka.common.errors.InvalidRequestException}

Review comment:
   Done.

##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS

[GitHub] [kafka] apovzner commented on a change in pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)

2020-09-25 Thread GitBox


apovzner commented on a change in pull request #9317:
URL: https://github.com/apache/kafka/pull/9317#discussion_r494034591



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1447,13 +1454,33 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   }
 }
 
+def removeSensors(): Unit = {

Review comment:
   agreed, `close` will be consistent. 
   
   About not closing ListenerConnectionQuota on ConnectionQuotas.close(): this 
is on broker shutdown path where KafkaServer calls metrics.close() on shutdown 
as well (and metrics are owned by KafkaServer), so I don't think we are leaking 
anything. But I think it's better to remove listener sensors on 
ConnectionQuotas.close() anyways, so I will make that change. 

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1292,6 +1292,12 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
 counts.synchronized {
   val startThrottleTimeMs = time.milliseconds
   val throttleTimeMs = 
math.max(recordConnectionAndGetThrottleTimeMs(listenerName, 
startThrottleTimeMs), 0)
+  if (throttleTimeMs > 0) {
+// record throttle time due to hitting connection rate limit
+// connection could be throttled longer if the limit of the number of 
active connections is reached as well
+maxConnectionsPerListener.get(listenerName)
+  
.foreach(_.connectionRateThrottleSensor.record(throttleTimeMs.toDouble, 
startThrottleTimeMs))

Review comment:
   That code seemed readable to me, but perhaps looking up 
`ListenerConnectionQuota` is better. I made a change to lookup once, please 
take a look.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-09-25 Thread GitBox


guozhangwang commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r494747319



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
##
@@ -60,6 +60,34 @@ public void prepareTopology() throws InterruptedException {
 rightStream = builder.stream(INPUT_TOPIC_RIGHT);
 }
 
+@Test
+public void shouldNotAccessJoinStoresWhenGivingName() throws 
InterruptedException {

Review comment:
   A good coverage improvement! Thanks.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##
@@ -104,19 +95,11 @@ public StreamThreadStateStoreProvider(final StreamThread 
streamThread,
 }
 }
 
-private TaskId createKeyTaskId(final String storeName, final Integer 
partition) {
-if (partition == null) {
-return null;
-}
-final List sourceTopics = 
internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
-final Set sourceTopicsSet = new HashSet<>(sourceTopics);
-final Map topicGroups = 
internalTopologyBuilder.topicGroups();
-for (final Map.Entry 
topicGroup : topicGroups.entrySet()) {
-if 
(topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
-return new TaskId(topicGroup.getKey(), partition);
-}
-}
-throw new InvalidStateStoreException("Cannot get state store " + 
storeName + " because the requested partition " +
-partition + " is not available on this instance");
+private Optional findStreamTask(final Collection tasks, final 
String storeName, final int partition) {

Review comment:
   This is a great find, thanks!

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##
@@ -56,25 +55,6 @@ public QueryableStoreProvider(final 
List storePr
 if (!globalStore.isEmpty()) {
 return queryableStoreType.create(globalStoreProvider, storeName);
 }
-final List allStores = new ArrayList<>();

Review comment:
   LGTM.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final 
StoreQueryParameters storeQueryParamet
 public  List stores(final String storeName,
   final QueryableStoreType queryableStoreType) {
 final List allStores = new ArrayList<>();
-for (final StreamThreadStateStoreProvider provider : storeProviders) {
-final List stores = provider.stores(storeQueryParameters);
-allStores.addAll(stores);
+for (final StreamThreadStateStoreProvider storeProvider : 
storeProviders) {
+final List stores = storeProvider.stores(storeQueryParameters);
+if (!stores.isEmpty()) {
+allStores.addAll(stores);
+if (storeQueryParameters.partition() != null) {
+break;
+}
+}
 }
 if (allStores.isEmpty()) {
+if (storeQueryParameters.partition() != null) {
+throw new InvalidStateStoreException(
+String.format("The specified partition %d for store %s 
does not exist.",

Review comment:
   Could you elaborate a bit more about this? If `allStores.isEmpty()` is 
empty, it is always possible that the specified store-partition or just 
store-"null" does not exist in this client. Why they are different failure 
cases?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-09-25 Thread GitBox


ijuma commented on pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#issuecomment-698237578


   Unrelated flaky failures:
   
   ```
   Build / JDK 15 / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateBelowLimit
 | 5 sec | 1
   -- | -- | --
   Build / JDK 8 / 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
 | 2 min 21 sec | 1
   Build / JDK 8 / 
org.apache.kafka.streams.integration.StandbyTaskEOSIntegrationTest.shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing[exactly_once]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde

2020-09-25 Thread GitBox


yeralin commented on pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-698414958


   Any updates :)? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-09-25 Thread GitBox


feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r494424452



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
 group.currentState match {
   case Stable =>
-info(s"Static member joins during Stable stage will not trigger 
rebalance.")
-group.maybeInvokeJoinCallback(member, JoinGroupResult(
-  members = List.empty,
-  memberId = newMemberId,
-  generationId = group.generationId,
-  protocolType = group.protocolType,
-  protocolName = group.protocolName,
-  // We want to avoid current leader performing trivial assignment 
while the group
-  // is in stable stage, because the new assignment in leader's next 
sync call
-  // won't be broadcast by a stable group. This could be guaranteed by
-  // always returning the old leader id so that the current leader 
won't assume itself
-  // as a leader based on the returned message, since the new 
member.id won't match
-  // returned leader id, therefore no assignment will be performed.
-  leaderId = currentLeader,
-  error = Errors.NONE))
+// check if group's selectedProtocol of next generation will change, 
if not, simply store group to persist the
+// updated static member, if yes, rebalance should be triggered to let 
the group's assignment and selectProtocol consistent
+val selectedProtocolOfNextGeneration = group.selectProtocol
+if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+  info(s"Static member which joins during Stable stage and doesn't 
affect selectProtocol will not trigger rebalance.")
+  val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
+  groupManager.storeGroup(group, groupAssignment, error => {
+group.inLock {
+  if (error != Errors.NONE) {
+warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")
+  }
+}
+  })
+  group.maybeInvokeJoinCallback(member, JoinGroupResult(
+members = List.empty,
+memberId = newMemberId,
+generationId = group.generationId,
+protocolType = group.protocolType,
+protocolName = group.protocolName,
+// We want to avoid current leader performing trivial assignment 
while the group
+// is in stable stage, because the new assignment in leader's next 
sync call
+// won't be broadcast by a stable group. This could be guaranteed 
by
+// always returning the old leader id so that the current leader 
won't assume itself
+// as a leader based on the returned message, since the new 
member.id won't match
+// returned leader id, therefore no assignment will be performed.
+leaderId = currentLeader,
+error = Errors.NONE))
+} else {
+  maybePrepareRebalance(group, s"Group's selectedProtocol will change 
because static member ${member.memberId} with instance id $groupInstanceId 
joined with change of protocol")

Review comment:
   For example, if previously there are one leader + one follower(both 
static) in a group, the protocols are: `List(("range", metadata), 
("roundrobin", metadata))` for both, and current selected protocol is `range`, 
if later the follower join again with protocols: `List(("roundrobin", 
metadata))`, and the selectedProtocol should be `roundrobin`, now, the 
selectedProtocol and the actual assignment is not consistent, here I let it 
rebalance to make sure that the selectedProtocol and actual assignment are 
consistent. On the other way around, if we don't do the rebalance, we cannot 
successfully persist the group since this line `val metadata = 
memberMetadata.metadata(protocol)` in 
`kafka.coordinator.group.GroupMetadataManager#groupMetadataValue` will fail.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
 group.currentState match {
   case Stable =>
-info(s"Static member joins during Stable stage will not trigger 
rebalance.")
-group.maybeInvokeJoinCallback(member, JoinGroupResult(
-  members = List.empty,
-  memberId = newMemberId,
-  generationId = group.generationId,
-  protocolType = group.protocolType,
-  protocolName = group.protocolName,
-  // We want to avoid current leader performing trivial assignment 
while the group
-  // is in stable stage, because the new assignment in leader's next 
sync call
-  // won't be broadcast by a stable group. This could be gu

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

2020-09-25 Thread GitBox


rajinisivaram commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494510121



##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
   else if (opts.hasDeleteOption)
 topicService.deleteTopic(opts)
 } catch {
+  case e: ExecutionException =>
+if (e.getCause != null)
+  printException(e.getCause)
+else
+  printException(e)
+exitCode = 1
   case e: Throwable =>
-println("Error while executing topic command : " + e.getMessage)
-error(Utils.stackTrace(e))
+printException(e)
 exitCode = 1
 } finally {
   topicService.close()
   Exit.exit(exitCode)
 }
   }
 
+  private def printException(e: Throwable): Unit = {
+println("Error while executing topic command : " + e.getMessage)
+error(Utils.stackTrace(e))

Review comment:
   Couldn't we just use `error(message, e)`?

##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
 assertEquals(2, rows.size)
 rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+val adminClient = Mockito.mock(classOf[Admin])
+val topicService = AdminClientTopicService(adminClient)
+
+val result = AdminClientTestUtils.createTopicsResult(testTopicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), 
ArgumentMatchers.any())).thenReturn(result)
+
+assertThrows(classOf[ThrottlingQuotaExceededException],
+  () => topicService.createTopic(new TopicCommandOptions(Array("--topic", 
testTopicName
+
+val expectedNewTopic = new NewTopic(testTopicName, 
Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava)

Review comment:
   We could just use Optional.empty instead of creating in Scala and 
converting?

##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
 assertEquals(2, rows.size)
 rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+val adminClient = Mockito.mock(classOf[Admin])
+val topicService = AdminClientTopicService(adminClient)
+
+val result = AdminClientTestUtils.createTopicsResult(testTopicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), 
ArgumentMatchers.any())).thenReturn(result)

Review comment:
   May be worth importing `org.mockito.Mockito._` and  
`org.mockito.ArgumentMatchers._` to avoid repeating the class name everywhere.

##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
   else if (opts.hasDeleteOption)
 topicService.deleteTopic(opts)
 } catch {
+  case e: ExecutionException =>
+if (e.getCause != null)
+  printException(e.getCause)
+else
+  printException(e)
+exitCode = 1
   case e: Throwable =>
-println("Error while executing topic command : " + e.getMessage)
-error(Utils.stackTrace(e))
+printException(e)
 exitCode = 1
 } finally {
   topicService.close()
   Exit.exit(exitCode)
 }
   }
 
+  private def printException(e: Throwable): Unit = {
+println("Error while executing topic command : " + e.getMessage)
+error(Utils.stackTrace(e))

Review comment:
   Sorry, I should have been more clear. I just meant, why do we call 
`Utils.stackTrace(e))` when `error(message, e)` gives you a stack trace for 
free. In terms of changing from `error` to `debug`, I think we include stack 
track in other commands. But each command seems to use something different. 
ConfigCommand, for example, doesn't print stacktrace for config exceptions. It 
logs both the error message and stacktrace to stderr. AclCommand prints for 
everything to stdout. Let's just keep `error` for now and maybe open a JIRA to 
improve and make tools consistent later?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #9336: MINOR: Don't publish javadocs for raft module

2020-09-25 Thread GitBox


ijuma edited a comment on pull request #9336:
URL: https://github.com/apache/kafka/pull/9336#issuecomment-698252529


   @hachikuji Do we intend the various non internal Raft classes to be a public 
API where we need a KIP to make any changes? If we don't want that or not yet, 
we should consider merging this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-25 Thread GitBox


vvcephei commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-698385372


   Hey @MicahRam , I'm ready to merge this now, but there are some merge 
conflicts. Can you rebase this PR against the current trunk? Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-09-25 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-698359953


   Thanks @abbccdda and @dajac for the reviews!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3

2020-09-25 Thread GitBox


guozhangwang commented on pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#issuecomment-698710912


   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4188/ 
triggered



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] luigiberrettini commented on pull request #3743: KAFKA-5494: enable idempotence with max.in.flight.requests.per.connection > 1

2020-09-25 Thread GitBox


luigiberrettini commented on pull request #3743:
URL: https://github.com/apache/kafka/pull/3743#issuecomment-698379057


   I saw that the 
[Sender](https://github.com/apache/kafka/blob/2.6.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L602)
 checks for a `Errors.DUPLICATE_SEQUENCE_NUMBER` but I was not able to find 
where this error is triggered on the server side.
   
   It seems to me that duplicate detection relies on checking if the sequence 
number is more than `lastPersistedSeq + 1`.
   If this is the case:
- why storing the metadata for the last batches and not just relying on the 
sequence number of the last message persisted in the log?
- why limiting `max.in.flight.requests.per.connection` to a maximun value 
of 5 if duplicates are still detected when metadata is not found (and therefore 
with any number of max in flights)?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)

2020-09-25 Thread GitBox


apovzner commented on pull request #9317:
URL: https://github.com/apache/kafka/pull/9317#issuecomment-698128538







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-09-25 Thread GitBox


ijuma merged pull request #9206:
URL: https://github.com/apache/kafka/pull/9206


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #9281: KAFKA-10478: Allow duplicated ports in advertised.listeners

2020-09-25 Thread GitBox


asdaraujo commented on a change in pull request #9281:
URL: https://github.com/apache/kafka/pull/9281#discussion_r494670714



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1686,9 +1686,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   def advertisedListeners: Seq[EndPoint] = {
 val advertisedListenersProp = 
getString(KafkaConfig.AdvertisedListenersProp)
 if (advertisedListenersProp != null)
-  CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
listenerSecurityProtocolMap)
+  CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
listenerSecurityProtocolMap, false)

Review comment:
   Updated





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-25 Thread GitBox


viktorsomogyi commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r494261046



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)

Review comment:
   Made the required changes, updated the related test too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-09-25 Thread GitBox


mimaison merged pull request #8295:
URL: https://github.com/apache/kafka/pull/8295


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-25 Thread GitBox


hachikuji commented on pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#issuecomment-698458937







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9336: MINOR: Don't publish javadocs for raft module

2020-09-25 Thread GitBox


ijuma commented on pull request #9336:
URL: https://github.com/apache/kafka/pull/9336#issuecomment-698252529


   @hachikuji Do we intend to various non internal Raft classes to be a public 
API where we need a KIP to make any changes? If we don't want that or not yet, 
we should consider merging this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-25 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494520245



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1764,6 +1769,145 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics.forEach { topicReq =>
+  topicReq.partitions.forEach { partitionReq =>
+val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+  }
+}
+
+def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code)
+case Left(partitionResults) =>
+  resp.setTopics(new util.ArrayList())
+  partitionResults
+.groupBy { case (tp, _) => tp.topic }   // Group by topic
+.foreach { case (topic, partitions) =>
+  // Add each topic part to the response
+  val topicResp = new AlterIsrResponseData.TopicData()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  resp.topics.add(topicResp)
+  partitions.foreach { case (tp, errorOrIsr) =>
+// Add each partition part to the response (new ISR or error)
+errorOrIsr match {
+  case Left(error) => topicResp.partitions.add(
+new AlterIsrResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  .setErrorCode(error.code))
+  case Right(leaderAndIsr) => topicResp.partitions.add(
+new AlterIsrResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  .setLeaderId(leaderAndIsr.leader)
+  .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+  .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+}
+}
+  }
+  }
+  callback.apply(resp)
+}
+
+eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+  callback: AlterIsrCallback): Unit = {
+
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Right(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+if (!brokerEpochOpt.contains(brokerEpoch)) {
+  info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+val response = try {
+  val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+
+  // Determine which partitions we will accept the new ISR for
+  val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+  val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+case Some(leaderIsrAndControllerEpoch) =>
+  val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+  if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {

Review comment:
   I was trying to think some kind of race with a zombie leader trying to 
update the ISR, however this would get fenced by the leader epoch. This should 
be pretty easy to add

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
+val isrToSendOpt: Option[Set[Int]] = proposedIsrState match {
+  case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+  case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+  case CommittedIsr(_) =>
+error(s"Asked to send AlterIsr but there are no pending updates")
+ 

[GitHub] [kafka] vvcephei commented on pull request #9316: MINOR: clarify variables for skipping idempotent source updates

2020-09-25 Thread GitBox


vvcephei commented on pull request #9316:
URL: https://github.com/apache/kafka/pull/9316#issuecomment-698378699


   Test failure was unrelated.
   ```
   Build / JDK 15 / 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-25 Thread GitBox


hachikuji merged pull request #9100:
URL: https://github.com/apache/kafka/pull/9100


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9331: MINOR: Use JUnit 5 in raft module

2020-09-25 Thread GitBox


ijuma merged pull request #9331:
URL: https://github.com/apache/kafka/pull/9331


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9335: Update quickstart.html

2020-09-25 Thread GitBox


mimaison commented on pull request #9335:
URL: https://github.com/apache/kafka/pull/9335#issuecomment-698388310


   Which version of Kafka are you using?
   
   Since 2.6, `--bootstrap-server` is the recommended flag. `--broker-list` is 
now deprecated and will be removed in the next major version.
   
   The quickstart page is up to date and recommends the current preferred flag



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9316: MINOR: clarify variables for skipping idempotent source updates

2020-09-25 Thread GitBox


vvcephei merged pull request #9316:
URL: https://github.com/apache/kafka/pull/9316


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

2020-09-25 Thread GitBox


dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-25 Thread GitBox


vvcephei commented on a change in pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#discussion_r494369894



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -581,8 +580,8 @@ public void punctuate(final ProcessorNode node, final long 
timestamp, final Punc
 if (processorContext.currentNode() != null) {
 throw new IllegalStateException(format("%sCurrent node is not 
null", logPrefix));
 }
-
-updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), 
node);
+
+updateProcessorContext(new StampedRecord(new 
ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null), 
timestamp), node);

Review comment:
   Yes, that's more or less what I was concerned about. We are trying to 
achieve a specific result here (and brand-new 'Header' instance each time), but 
it's only achieved via a side effect.
   
   On the other hand, I just took a closer look at the code, and I see that 
there's a better reason to keep using `ConsumerRecord`, namely that 
`updateProcessorContext` is used both for regular processing (with "real" 
records) and for punctuation here with this dummy record. It seems like a good 
idea to prevent those code paths from diverging, so I'm +1 with keeping this 
change as-is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9332: KAFKA-10511; Ensure monotonic start epoch/offset updates in `MockLog`

2020-09-25 Thread GitBox


hachikuji commented on a change in pull request #9332:
URL: https://github.com/apache/kafka/pull/9332#discussion_r494494327



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -310,10 +310,10 @@ public LogFetchInfo read(long startOffset, Isolation 
isolation) {
 }
 
 @Override
-public void assignEpochStartOffset(int epoch, long startOffset) {
-if (startOffset != endOffset().offset)
-throw new IllegalArgumentException(
-"Can only assign epoch for the end offset " + 
endOffset().offset + ", but get offset " + startOffset);
+public void initializeLeaderEpoch(int epoch) {
+long startOffset = endOffset().offset;
+epochStartOffsets.removeIf(epochStartOffset ->

Review comment:
   There's no guarantee that the leader change message ever gets committed. 
It is even possible for the leadership to change multiple times before a leader 
change message can be committed. There is no correctness problem with the 
current implementation. I just wanted the behavior to be consistent with 
`LeaderEpochFileCache`.

##
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##
@@ -370,6 +365,23 @@ public void testReadOutOfRangeOffset() {
 Isolation.UNCOMMITTED));
 }
 
+@Test
+public void testMonotonicEpochStartOffset() {
+appendBatch(5, 1);
+assertEquals(5L, log.endOffset().offset);
+
+log.initializeLeaderEpoch(2);
+assertEquals(Optional.of(new OffsetAndEpoch(5L, 1)), 
log.endOffsetForEpoch(1));
+assertEquals(Optional.of(new OffsetAndEpoch(5L, 2)), 
log.endOffsetForEpoch(2));
+
+// Initialize a new epoch at the same end offset. The epoch cache 
ensures
+// that the start offset of each retained epoch increases 
monotonically.

Review comment:
   The epoch cache tracks tuples of (epoch, start offset). The start offset 
of a new leader epoch is the current log end offset.

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -712,11 +712,6 @@ void initializeElection(ElectionState election) {
 
 nodes.values().forEach(state -> {
 state.store.writeElectionState(election);
-if (election.hasLeader()) {

Review comment:
   Oh, I think this was just unnecessary. The purpose of this logic is to 
initialize a certain election state prior to starting the simulation. But the 
code already initializes the epoch state on startup, so we didn't need this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9331: MINOR: Use JUnit 5 in raft module

2020-09-25 Thread GitBox


ijuma commented on pull request #9331:
URL: https://github.com/apache/kafka/pull/9331#issuecomment-698233595


   Unrelated flaky test failures:
   
   ```
   Build / JDK 11 / 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
   Build / JDK 15 / 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
   Build / JDK 15 / 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
   Build / JDK 15 / 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9332: KAFKA-10511; Ensure monotonic start epoch/offset updates in `MockLog`

2020-09-25 Thread GitBox


abbccdda commented on a change in pull request #9332:
URL: https://github.com/apache/kafka/pull/9332#discussion_r494062676



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##
@@ -370,6 +365,23 @@ public void testReadOutOfRangeOffset() {
 Isolation.UNCOMMITTED));
 }
 
+@Test
+public void testMonotonicEpochStartOffset() {
+appendBatch(5, 1);
+assertEquals(5L, log.endOffset().offset);
+
+log.initializeLeaderEpoch(2);
+assertEquals(Optional.of(new OffsetAndEpoch(5L, 1)), 
log.endOffsetForEpoch(1));
+assertEquals(Optional.of(new OffsetAndEpoch(5L, 2)), 
log.endOffsetForEpoch(2));
+
+// Initialize a new epoch at the same end offset. The epoch cache 
ensures
+// that the start offset of each retained epoch increases 
monotonically.

Review comment:
   I thought this is a test for end offset?

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -712,11 +712,6 @@ void initializeElection(ElectionState election) {
 
 nodes.values().forEach(state -> {
 state.store.writeElectionState(election);
-if (election.hasLeader()) {

Review comment:
   So the reason that this is safe is because we no longer try assigning 
offset other than the end offset?

##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -310,10 +310,10 @@ public LogFetchInfo read(long startOffset, Isolation 
isolation) {
 }
 
 @Override
-public void assignEpochStartOffset(int epoch, long startOffset) {
-if (startOffset != endOffset().offset)
-throw new IllegalArgumentException(
-"Can only assign epoch for the end offset " + 
endOffset().offset + ", but get offset " + startOffset);
+public void initializeLeaderEpoch(int epoch) {
+long startOffset = endOffset().offset;
+epochStartOffsets.removeIf(epochStartOffset ->

Review comment:
   So the idea is to wipe out the older epoch (as epoch - 1)'s end offset, 
and the search for (epoch - 1) would give (epoch - 2) end offset? In the case 
where we put leader change message in mock log, this should never happen right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9337: KAFKA-10519; Add missing unit test for `VotedState`

2020-09-25 Thread GitBox


guozhangwang commented on pull request #9337:
URL: https://github.com/apache/kafka/pull/9337#issuecomment-698706077


   LGTM.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9313: [mm2] Fix consumer/producer properties override

2020-09-25 Thread GitBox


mimaison commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-698229647


   @scanterog Thanks for the PR. Can you add a test for the changes?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-8318) Session Window Aggregations generate an extra tombstone

2020-09-25 Thread Ilia Pasynkov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilia Pasynkov reassigned KAFKA-8318:


Assignee: Ilia Pasynkov

> Session Window Aggregations generate an extra tombstone
> ---
>
> Key: KAFKA-8318
> URL: https://issues.apache.org/jira/browse/KAFKA-8318
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Ilia Pasynkov
>Priority: Minor
>  Labels: newbie++
>
> See the discussion 
> https://github.com/apache/kafka/pull/6654#discussion_r280231439
> The session merging logic generates a tombstone in addition to an update when 
> the session window already exists. It's not a correctness issue, just a small 
> performance hit, because that tombstone is immediately invalidated by the 
> update.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] fredsh2k closed pull request #9335: Update quickstart.html

2020-09-25 Thread GitBox


fredsh2k closed pull request #9335:
URL: https://github.com/apache/kafka/pull/9335


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fredsh2k commented on pull request #9335: Update quickstart.html

2020-09-25 Thread GitBox


fredsh2k commented on pull request #9335:
URL: https://github.com/apache/kafka/pull/9335#issuecomment-698870391


   kafka_2.12-2.4.0
   
   ok, thanks for the help :) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-25 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-698835897


   @ijuma any chance you could trigger a CI build? Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10425) Documentation switches to a random page on clicking the left navigation bar hide/expand button

2020-09-25 Thread Shadi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17202022#comment-17202022
 ] 

Shadi commented on KAFKA-10425:
---

[~sanjayyr] Hi, I can't reproduce this. What browser are you using?

> Documentation switches to a random page on clicking the left navigation bar 
> hide/expand button
> --
>
> Key: KAFKA-10425
> URL: https://issues.apache.org/jira/browse/KAFKA-10425
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Affects Versions: 2.6.0, 2.5.1
>Reporter: Sanjay Ravikumar
>Priority: Minor
>  Labels: documentation
>
> The Kafka documentation includes a button to hide or expand left navigation 
> bar. On clicking that button, the documentation switches to a random page. 
> For example, while I'm on 
> [https://kafka.apache.org/documentation.html#hwandos|https://kafka.apache.org/25/documentation.html#hwandos],
>  if I click that hide button, the page switches to some page further down in 
> the documentation. Similarly, if I'm on a certain page with left navigation 
> bar hidden, and when I click that button, the page switches  to a page 
> further up in the documentation.
> This might be happening due to page resizing when that button is clicked. The 
> issue is present in both 2.6.0 and 2.5.1 versions. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

2020-09-25 Thread GitBox


dajac commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494856451



##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
   else if (opts.hasDeleteOption)
 topicService.deleteTopic(opts)
 } catch {
+  case e: ExecutionException =>
+if (e.getCause != null)
+  printException(e.getCause)
+else
+  printException(e)
+exitCode = 1
   case e: Throwable =>
-println("Error while executing topic command : " + e.getMessage)
-error(Utils.stackTrace(e))
+printException(e)
 exitCode = 1
 } finally {
   topicService.close()
   Exit.exit(exitCode)
 }
   }
 
+  private def printException(e: Throwable): Unit = {
+println("Error while executing topic command : " + e.getMessage)
+error(Utils.stackTrace(e))

Review comment:
   That's a very good question. I guess that we did so in order to just 
print out the stacktrace without the message. Otherwise, we would have the 
error message printed out by the `println`, followed by the same message 
printed out by logger, followed by the stacktrace. Having the message twice is 
not necessary. I had a quick look at other commands and we do so everywhere.
   
   I will open a JIRA to make tools more consistent. Good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8897: MINOR; Use the automated protocol for the Consumer Protocol's subscriptions and assignments

2020-09-25 Thread GitBox


dajac commented on pull request #8897:
URL: https://github.com/apache/kafka/pull/8897#issuecomment-698811607


   @hachikuji Thanks for your suggestions. I have incorporated them.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

2020-09-25 Thread GitBox


rajinisivaram commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494837556



##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
   else if (opts.hasDeleteOption)
 topicService.deleteTopic(opts)
 } catch {
+  case e: ExecutionException =>
+if (e.getCause != null)
+  printException(e.getCause)
+else
+  printException(e)
+exitCode = 1
   case e: Throwable =>
-println("Error while executing topic command : " + e.getMessage)
-error(Utils.stackTrace(e))
+printException(e)
 exitCode = 1
 } finally {
   topicService.close()
   Exit.exit(exitCode)
 }
   }
 
+  private def printException(e: Throwable): Unit = {
+println("Error while executing topic command : " + e.getMessage)
+error(Utils.stackTrace(e))

Review comment:
   Sorry, I should have been more clear. I just meant, why do we call 
`Utils.stackTrace(e))` when `error(message, e)` gives you a stack trace for 
free. In terms of changing from `error` to `debug`, I think we include stack 
track in other commands. But each command seems to use something different. 
ConfigCommand, for example, doesn't print stacktrace for config exceptions. It 
logs both the error message and stacktrace to stderr. AclCommand prints for 
everything to stdout. Let's just keep `error` for now and maybe open a JIRA to 
improve and make tools consistent later?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10523) Allow to provide producer ID

2020-09-25 Thread Luigi Berrettini (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201992#comment-17201992
 ] 

Luigi Berrettini commented on KAFKA-10523:
--

I saw that an ID can be provided for transactions: supposing I need to publish 
only on one topic would idempotence be guaranteed across producer restarts 
calling producer.initTransactions() and the performing the send out of the 
transaction (i.e. without calling begin or commit).

Essentially I would use producer.initTransactions() just to acquire the same 
PID so that I have idempotency across restarts.

> Allow to provide producer ID
> 
>
> Key: KAFKA-10523
> URL: https://issues.apache.org/jira/browse/KAFKA-10523
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Luigi Berrettini
>Priority: Minor
>
> I read about the implementation of idempotence and saw that it is only 
> guaranteed within a producer session, since it depends on a PID reassigned 
> every time the producer (re)start.
> The PID is probably assigne relying on ZooKeeper, but I was wondering if it 
> could be possible to support providing a PID externally to gain idempotence 
> across restrarts e.g. having the producing application read the PID from a 
> configuration file.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

2020-09-25 Thread GitBox


dajac commented on pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#issuecomment-698784923


   @rajinisivaram Thanks for your comments. I have updated the PR and answered 
your questions.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

2020-09-25 Thread GitBox


dajac commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494815168



##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
   else if (opts.hasDeleteOption)
 topicService.deleteTopic(opts)
 } catch {
+  case e: ExecutionException =>
+if (e.getCause != null)
+  printException(e.getCause)
+else
+  printException(e)
+exitCode = 1
   case e: Throwable =>
-println("Error while executing topic command : " + e.getMessage)
-error(Utils.stackTrace(e))
+printException(e)
 exitCode = 1
 } finally {
   topicService.close()
   Exit.exit(exitCode)
 }
   }
 
+  private def printException(e: Throwable): Unit = {
+println("Error while executing topic command : " + e.getMessage)
+error(Utils.stackTrace(e))

Review comment:
   Do you mean using `error(message, e)` to replace both `println` and 
`error`? I think that we are using 'println` here in order to print the message 
to stdout without any logger related stuff and regardless of how the logger is 
configured. Changing to using `error(message, e)` would break this and 
potentially break existing application due to introducing the logger related 
stuff for that message. I think that we should keep `println` here.
   
   However, I wonder if using `error` is appropriate here as it basically 
prints a stacktrace for every errors. The UX does not look good. Would it make 
sense to use `debug` instead? The message of the exception is printed anyway 
and I don't think that the stacktrace provides much to regular users. WDYT?
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

2020-09-25 Thread GitBox


dajac commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494810769



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
 assertEquals(2, rows.size)
 rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+val adminClient = Mockito.mock(classOf[Admin])
+val topicService = AdminClientTopicService(adminClient)
+
+val result = AdminClientTestUtils.createTopicsResult(testTopicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), 
ArgumentMatchers.any())).thenReturn(result)

Review comment:
   That makes sense. I had to introduce an alias for `ArgumentMatchers.eq` 
to not conflict with `eq`. I went with `eqThat` to remain inline with `argThat`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`

2020-09-25 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201980#comment-17201980
 ] 

Chia-Ping Tsai commented on KAFKA-8266:
---

{quote}
I will take a look at this. I wonder why this has come back more frequently 
now...
{quote}

[~dajac] Thank you so much :)

> Improve 
> `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
> 
>
> Key: KAFKA-8266
> URL: https://issues.apache.org/jira/browse/KAFKA-8266
> Project: Kafka
>  Issue Type: Test
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
>
> Some additional validation could be done after the member gets kicked out. 
> The main thing is showing that the group can continue to consume data and 
> commit offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`

2020-09-25 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-8266:
--

Assignee: David Jacot

> Improve 
> `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
> 
>
> Key: KAFKA-8266
> URL: https://issues.apache.org/jira/browse/KAFKA-8266
> Project: Kafka
>  Issue Type: Test
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
>
> Some additional validation could be done after the member gets kicked out. 
> The main thing is showing that the group can continue to consume data and 
> commit offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`

2020-09-25 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201977#comment-17201977
 ] 

David Jacot commented on KAFKA-8266:


[~chia7712] I will take a look at this. I wonder why this has come back more 
frequently now...

> Improve 
> `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
> 
>
> Key: KAFKA-8266
> URL: https://issues.apache.org/jira/browse/KAFKA-8266
> Project: Kafka
>  Issue Type: Test
>Reporter: Jason Gustafson
>Priority: Major
>
> Some additional validation could be done after the member gets kicked out. 
> The main thing is showing that the group can continue to consume data and 
> commit offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)