[GitHub] [kafka] dengziming opened a new pull request #11934: KAFKA-12908: Load raft snapshot heuristic (WIP)

2022-03-22 Thread GitBox


dengziming opened a new pull request #11934:
URL: https://github.com/apache/kafka/pull/11934


   *More detailed description of your change*
   TODO 
   
   *Summary of testing strategy (including rationale)*
   TODO
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-22 Thread GitBox


dengziming commented on pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#issuecomment-1075941241


   @dajac It seems this test no longer fails in new PRs, it's a rare error and 
can't reproduce anywhere, I will notice it in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11805: KAFKA-13692: include metadata wait time in total blocked time

2022-03-22 Thread GitBox


guozhangwang commented on a change in pull request #11805:
URL: https://github.com/apache/kafka/pull/11805#discussion_r832843342



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##
@@ -76,7 +75,7 @@ public KafkaProducerMetrics(Metrics metrics) {
 );
 metadataWaitSensor = newLatencySensor(
 METADATA_WAIT,
-"Total time producer has spent waiting on metadata in "
+"Total time producer has spent waiting on metadata on topic 
metadata."

Review comment:
   nit: dup words? Should it be "waiting on metadata in nanoseconds."?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13761) KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-22 Thread Yang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Yu updated KAFKA-13761:

Description: 
KafkaLog4jAppender instantiates a KafkaProducer to append log entries to a 
Kafka topic. The producer.send operation may need to acquire locks during its 
execution. This can result in deadlocks when a log entry from the producer 
network thread is also at a log level that results in the entry being appended 
to a Kafka topic (KAFKA-6415).

[https://github.com/apache/kafka/pull/11691] enables idempotence by default, 
and it introduced another place where the producer network thread can hit a 
deadlock. When calling TransactionManger#maybeAddPartition, the producer 
network thread will wait on the TransactionManager lock, and a deadlock can 
happen if TransactionManager also logs at INFO level. This is causing system 
test failures in log4j_appender_test.py

Similar to KAFKA-6415, a workaround will be setting log level to WARN for 
TransactionManager in VerifiableLog4jAppender.

 

  was:
KafkaLog4jAppender instantiates a KafkaProducer to append log entries to a 
Kafka topic. The producer.send operation may need to acquire locks during its 
execution. This can result in deadlocks when a log entry from the producer 
network thread is also at a log level that results in the entry being appended 
to a Kafka topic 
([KAFKA-6415|https://issues.apache.org/jira/browse/KAFKA-6415]).

[https://github.com/apache/kafka/pull/11691] enables idempotence by default, 
and it introduced another place where the producer network thread can hit a 
deadlock. When calling TransactionManger#maybeAddPartition, the producer 
network thread will wait on the TransactionManager lock, and a deadlock can 
happen if TransactionManager also logs at INFO level. This is causing system 
test failures in log4j_appender_test.py

Similar to KAFKA-6415, a workaround will be setting log level to WARN for 
TransactionManager in VerifiableLog4jAppender.

 


> KafkaLog4jAppender deadlocks when idempotence is enabled
> 
>
> Key: KAFKA-13761
> URL: https://issues.apache.org/jira/browse/KAFKA-13761
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Yang Yu
>Priority: Major
>
> KafkaLog4jAppender instantiates a KafkaProducer to append log entries to a 
> Kafka topic. The producer.send operation may need to acquire locks during its 
> execution. This can result in deadlocks when a log entry from the producer 
> network thread is also at a log level that results in the entry being 
> appended to a Kafka topic (KAFKA-6415).
> [https://github.com/apache/kafka/pull/11691] enables idempotence by default, 
> and it introduced another place where the producer network thread can hit a 
> deadlock. When calling TransactionManger#maybeAddPartition, the producer 
> network thread will wait on the TransactionManager lock, and a deadlock can 
> happen if TransactionManager also logs at INFO level. This is causing system 
> test failures in log4j_appender_test.py
> Similar to KAFKA-6415, a workaround will be setting log level to WARN for 
> TransactionManager in VerifiableLog4jAppender.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13761) KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-22 Thread Yang Yu (Jira)
Yang Yu created KAFKA-13761:
---

 Summary: KafkaLog4jAppender deadlocks when idempotence is enabled
 Key: KAFKA-13761
 URL: https://issues.apache.org/jira/browse/KAFKA-13761
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 3.0.1, 3.0.0, 3.1.0
Reporter: Yang Yu


KafkaLog4jAppender instantiates a KafkaProducer to append log entries to a 
Kafka topic. The producer.send operation may need to acquire locks during its 
execution. This can result in deadlocks when a log entry from the producer 
network thread is also at a log level that results in the entry being appended 
to a Kafka topic 
([KAFKA-6415|https://issues.apache.org/jira/browse/KAFKA-6415]).

[https://github.com/apache/kafka/pull/11691] enables idempotence by default, 
and it introduced another place where the producer network thread can hit a 
deadlock. When calling TransactionManger#maybeAddPartition, the producer 
network thread will wait on the TransactionManager lock, and a deadlock can 
happen if TransactionManager also logs at INFO level. This is causing system 
test failures in log4j_appender_test.py

Similar to KAFKA-6415, a workaround will be setting log level to WARN for 
TransactionManager in VerifiableLog4jAppender.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13760) Kafka 2.8.1 : log clean skip __consumer__offsets-45

2022-03-22 Thread zhangzhisheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzhisheng updated KAFKA-13760:
--
Description: 
I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
__consumer_offsets-45 log cannot clean.

kafka version:2.13_2.8.1

jdk version: openjdk 11.0.2

linux version: 4.4.71-1.el7.elrepo.x86_64 (mockbuild@Build64F25) (gcc version 
4.8.5 20150623 (Red Hat 4.8.5-11) 

log-cleaner.log info
{code:java}
// more kafka_2.13-2.8.1/logs/log-cleaner.log.2022-03-23-11
[2022-03-23 11:59:03,571] INFO Starting the log cleaner (kafka.log.LogCleaner)
[2022-03-23 11:59:03,633] INFO [kafka-log-cleaner-thread-0]: Starting 
(kafka.log.LogCleaner)
[2022-03-23 11:59:04,234] INFO Cleaner 0: Beginning cleaning of log 
__transaction_state-9. (kafka.log.LogCleaner)
[2022-03-23 11:59:04,235] INFO Cleaner 0: Building offset map for 
__transaction_state-9... (kafka.log.LogCleaner)
[2022-03-23 11:59:04,294] INFO Cleaner 0: Building offset map for log 
__transaction_state-9 for 1 segments in offset range [0, 118584). 
(kafka.log.LogCleaner)
[2022-03-23 11:59:04,435] INFO Cleaner 0: Offset map for log 
__transaction_state-9 complete. (kafka.log.LogCleaner)
[2022-03-23 11:59:04,436] INFO Cleaner 0: Cleaning log __transaction_state-9 
(cleaning prior to Tue Nov 30 17:52:18 CST 2021, discarding tombstones prior to 
Thu Jan 0
1 08:00:00 CST 1970)... (kafka.log.LogCleaner)
[2022-03-23 11:59:04,443] INFO Cleaner 0: Cleaning LogSegment(baseOffset=0, 
size=152354, lastModifiedTime=1638265938290, 
largestRecordTimestamp=Some(1638265938290)) i
n log __transaction_state-9 into 0 with deletion horizon 0, retaining deletes. 
(kafka.log.LogCleaner) {code}
 

cleaner-offset-checkpoint info 
{code:java}
[admin@XXX ~]$ cat data/kafka-logs/cleaner-offset-checkpoint
0
99
__transaction_state 7 85731
__consumer_offsets 25 2310286231
__transaction_state 0 47287
__transaction_state 6 53470
__transaction_state 18 29151
__consumer_offsets 22 2025808320
__transaction_state 42 188187
__consumer_offsets 30 432921676
__transaction_state 31 18666
__transaction_state 45 52156
__transaction_state 15 416390
__transaction_state 12 393816
__consumer_offsets 8 402586656
__consumer_offsets 21 1012193789
__consumer_offsets 4 473338672
__transaction_state 46 230722
__consumer_offsets 27 480427838
__consumer_offsets 7 524517122
__transaction_state 48 42549
__consumer_offsets 9 441859835
__transaction_state 49 47535
__consumer_offsets 46 571154678
__consumer_offsets 35 450957687
__transaction_state 28 19026
__transaction_state 2 45994
__transaction_state 20 30522
__transaction_state 24 47341
__consumer_offsets 41 630630351
__consumer_offsets 33 1264879478
__consumer_offsets 23 501207312
__transaction_state 13 439134
__consumer_offsets 49 729019855
__consumer_offsets 47 522662343
__consumer_offsets 16 3750767931
__consumer_offsets 28 411700445
__transaction_state 37 29689
__transaction_state 3 46614
__consumer_offsets 31 1032954867
__transaction_state 21 30525
__consumer_offsets 36 497613885
__transaction_state 29 18879
__consumer_offsets 42 498365278
__consumer_offsets 3 411713277
__consumer_offsets 18 1389078671
__transaction_state 39 32046
__consumer_offsets 37 693879385
__transaction_state 38 29483
__consumer_offsets 15 459557511
__consumer_offsets 24 472248935
__transaction_state 14 354101
__transaction_state 10 154090
__transaction_state 44 279604
__transaction_state 9 118584
__transaction_state 22 47669
__transaction_state 43 183150
__transaction_state 4 51629
__transaction_state 30 19312
__transaction_state 33 19110
__consumer_offsets 38 426170690
__consumer_offsets 48 901521233
__consumer_offsets 17 572951951
__transaction_state 32 19257
__transaction_state 25 30406
__transaction_state 17 29094
__consumer_offsets 19 480121644
__consumer_offsets 11 414732877
__transaction_state 23 48148
__transaction_state 47 51783
__consumer_offsets 13 502754270
__consumer_offsets 2 671995606
__consumer_offsets 43 1653739033
__consumer_offsets 6 561249470
__consumer_offsets 14 420688971
__transaction_state 26 30074
__transaction_state 36 30105
__transaction_state 5 61932
__transaction_state 8 125395
__consumer_offsets 0 378236239
__consumer_offsets 44 1318426000
__transaction_state 11 128083
__consumer_offsets 20 646102919
__transaction_state 16 421407
__consumer_offsets 39 574972378
__consumer_offsets 12 557238642
__transaction_state 40 163727
__consumer_offsets 1 500129407
__consumer_offsets 5 577571358
__consumer_offsets 26 735426985
__transaction_state 19 29236
__transaction_state 27 22656
__consumer_offsets 34 425587446
__consumer_offsets 29 579775801
__transaction_state 41 164107
__transaction_state 1 45750
__consumer_offsets 10 421962673
__consumer_offsets 32 464195167
__transaction_state 34 19492
__transaction_state 35 19083
__consumer_offsets 40 410754361{code}

  was:
I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 

[jira] [Updated] (KAFKA-13760) Kafka 2.8.1 : log clean skip __consumer__offsets-45

2022-03-22 Thread zhangzhisheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzhisheng updated KAFKA-13760:
--
Description: 
I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
__consumer_offsets-45 log cannot clean.

kafka version:2.13_2.8.1

jdk version: openjdk 11.0.2

linux version: 4.4.71-1.el7.elrepo.x86_64 (mockbuild@Build64F25) (gcc version 
4.8.5 20150623 (Red Hat 4.8.5-11) 

cleaner-offset-checkpoint info 
{code:java}
[admin@iZbp18um4r4f295ku0s4tbZ ~]$ cat data/kafka-logs/cleaner-offset-checkpoint
0
99
__transaction_state 7 85731
__consumer_offsets 25 2310286231
__transaction_state 0 47287
__transaction_state 6 53470
__transaction_state 18 29151
__consumer_offsets 22 2025808320
__transaction_state 42 188187
__consumer_offsets 30 432921676
__transaction_state 31 18666
__transaction_state 45 52156
__transaction_state 15 416390
__transaction_state 12 393816
__consumer_offsets 8 402586656
__consumer_offsets 21 1012193789
__consumer_offsets 4 473338672
__transaction_state 46 230722
__consumer_offsets 27 480427838
__consumer_offsets 7 524517122
__transaction_state 48 42549
__consumer_offsets 9 441859835
__transaction_state 49 47535
__consumer_offsets 46 571154678
__consumer_offsets 35 450957687
__transaction_state 28 19026
__transaction_state 2 45994
__transaction_state 20 30522
__transaction_state 24 47341
__consumer_offsets 41 630630351
__consumer_offsets 33 1264879478
__consumer_offsets 23 501207312
__transaction_state 13 439134
__consumer_offsets 49 729019855
__consumer_offsets 47 522662343
__consumer_offsets 16 3750767931
__consumer_offsets 28 411700445
__transaction_state 37 29689
__transaction_state 3 46614
__consumer_offsets 31 1032954867
__transaction_state 21 30525
__consumer_offsets 36 497613885
__transaction_state 29 18879
__consumer_offsets 42 498365278
__consumer_offsets 3 411713277
__consumer_offsets 18 1389078671
__transaction_state 39 32046
__consumer_offsets 37 693879385
__transaction_state 38 29483
__consumer_offsets 15 459557511
__consumer_offsets 24 472248935
__transaction_state 14 354101
__transaction_state 10 154090
__transaction_state 44 279604
__transaction_state 9 118584
__transaction_state 22 47669
__transaction_state 43 183150
__transaction_state 4 51629
__transaction_state 30 19312
__transaction_state 33 19110
__consumer_offsets 38 426170690
__consumer_offsets 48 901521233
__consumer_offsets 17 572951951
__transaction_state 32 19257
__transaction_state 25 30406
__transaction_state 17 29094
__consumer_offsets 19 480121644
__consumer_offsets 11 414732877
__transaction_state 23 48148
__transaction_state 47 51783
__consumer_offsets 13 502754270
__consumer_offsets 2 671995606
__consumer_offsets 43 1653739033
__consumer_offsets 6 561249470
__consumer_offsets 14 420688971
__transaction_state 26 30074
__transaction_state 36 30105
__transaction_state 5 61932
__transaction_state 8 125395
__consumer_offsets 0 378236239
__consumer_offsets 44 1318426000
__transaction_state 11 128083
__consumer_offsets 20 646102919
__transaction_state 16 421407
__consumer_offsets 39 574972378
__consumer_offsets 12 557238642
__transaction_state 40 163727
__consumer_offsets 1 500129407
__consumer_offsets 5 577571358
__consumer_offsets 26 735426985
__transaction_state 19 29236
__transaction_state 27 22656
__consumer_offsets 34 425587446
__consumer_offsets 29 579775801
__transaction_state 41 164107
__transaction_state 1 45750
__consumer_offsets 10 421962673
__consumer_offsets 32 464195167
__transaction_state 34 19492
__transaction_state 35 19083
__consumer_offsets 40 410754361{code}

  was:
I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
__consumer_offsets-45 log cannot clean.

kafka version:2.13_2.8.1

jdk version: openjdk 11.0.2

linux version: 4.4.71-1.el7.elrepo.x86_64 (mockbuild@Build64F25) (gcc version 
4.8.5 20150623 (Red Hat 4.8.5-11) 

cleaner-offset-checkpoint info 
{code:java}

[admin@iZbp18um4r4f295ku0s4tbZ ~]$ cat data/kafka-logs/cleaner-offset-checkpoint
0
99
__transaction_state 7 85731
__consumer_offsets 25 2310286231
__transaction_state 0 47287
__transaction_state 6 53470
__transaction_state 18 29151
__consumer_offsets 22 2025808320
__transaction_state 42 188187
__consumer_offsets 30 432921676 {code}


> Kafka 2.8.1 : log clean skip __consumer__offsets-45
> ---
>
> Key: KAFKA-13760
> URL: https://issues.apache.org/jira/browse/KAFKA-13760
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.8.1
>Reporter: zhangzhisheng
>Priority: Major
>
> I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
> __consumer_offsets-45 log cannot clean.
> kafka version:2.13_2.8.1
> jdk version: openjdk 11.0.2
> linux version: 4.4.71-1.el7.elrepo.x86_64 (mockbuild@Build64F25) (gcc version 
> 4.8.5 20150623 

[jira] [Updated] (KAFKA-13760) Kafka 2.8.1 : log clean skip __consumer__offsets-45

2022-03-22 Thread zhangzhisheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzhisheng updated KAFKA-13760:
--
Description: 
I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
__consumer_offsets-45 log cannot clean.

kafka version:2.13_2.8.1

jdk version: openjdk 11.0.2

linux version: 4.4.71-1.el7.elrepo.x86_64 (mockbuild@Build64F25) (gcc version 
4.8.5 20150623 (Red Hat 4.8.5-11) 

cleaner-offset-checkpoint info 
{code:java}

[admin@iZbp18um4r4f295ku0s4tbZ ~]$ cat data/kafka-logs/cleaner-offset-checkpoint
0
99
__transaction_state 7 85731
__consumer_offsets 25 2310286231
__transaction_state 0 47287
__transaction_state 6 53470
__transaction_state 18 29151
__consumer_offsets 22 2025808320
__transaction_state 42 188187
__consumer_offsets 30 432921676 {code}

  was:
I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
__consumer_offsets-45 log cannot clean.

kafka version:2.13_2.8.1

jdk version: openjdk 11.0.2

linux version: 4.4.71-1.el7.elrepo.x86_64 (mockbuild@Build64F25) (gcc version 
4.8.5 20150623 (Red Hat 4.8.5-11) 

 


> Kafka 2.8.1 : log clean skip __consumer__offsets-45
> ---
>
> Key: KAFKA-13760
> URL: https://issues.apache.org/jira/browse/KAFKA-13760
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.8.1
>Reporter: zhangzhisheng
>Priority: Major
>
> I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
> __consumer_offsets-45 log cannot clean.
> kafka version:2.13_2.8.1
> jdk version: openjdk 11.0.2
> linux version: 4.4.71-1.el7.elrepo.x86_64 (mockbuild@Build64F25) (gcc version 
> 4.8.5 20150623 (Red Hat 4.8.5-11) 
> cleaner-offset-checkpoint info 
> {code:java}
> [admin@iZbp18um4r4f295ku0s4tbZ ~]$ cat 
> data/kafka-logs/cleaner-offset-checkpoint
> 0
> 99
> __transaction_state 7 85731
> __consumer_offsets 25 2310286231
> __transaction_state 0 47287
> __transaction_state 6 53470
> __transaction_state 18 29151
> __consumer_offsets 22 2025808320
> __transaction_state 42 188187
> __consumer_offsets 30 432921676 {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13760) Kafka 2.8.1 : log clean skip __consumer__offsets-45

2022-03-22 Thread zhangzhisheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzhisheng updated KAFKA-13760:
--
Description: 
I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
__consumer_offsets-45 log cannot clean.

kafka version:2.13_2.8.1

jdk version: openjdk 11.0.2

linux version: 4.4.71-1.el7.elrepo.x86_64 (mockbuild@Build64F25) (gcc version 
4.8.5 20150623 (Red Hat 4.8.5-11) 

 

  was:I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
__consumer_offsets-45 log cannot clean.


> Kafka 2.8.1 : log clean skip __consumer__offsets-45
> ---
>
> Key: KAFKA-13760
> URL: https://issues.apache.org/jira/browse/KAFKA-13760
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.8.1
>Reporter: zhangzhisheng
>Priority: Major
>
> I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
> __consumer_offsets-45 log cannot clean.
> kafka version:2.13_2.8.1
> jdk version: openjdk 11.0.2
> linux version: 4.4.71-1.el7.elrepo.x86_64 (mockbuild@Build64F25) (gcc version 
> 4.8.5 20150623 (Red Hat 4.8.5-11) 
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13760) Kafka 2.8.1 : log clean skip __consumer__offsets-45

2022-03-22 Thread zhangzhisheng (Jira)
zhangzhisheng created KAFKA-13760:
-

 Summary: Kafka 2.8.1 : log clean skip __consumer__offsets-45
 Key: KAFKA-13760
 URL: https://issues.apache.org/jira/browse/KAFKA-13760
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 2.8.1
Reporter: zhangzhisheng


I upgraded Kafka from 2.12_2.4.1 to 2.13_2.8.1 version. From then, 
__consumer_offsets-45 log cannot clean.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on pull request #11892: [Emit final][4/N] add time ordered store factory

2022-03-22 Thread GitBox


guozhangwang commented on pull request #11892:
URL: https://github.com/apache/kafka/pull/11892#issuecomment-1075887393


   Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11892: [Emit final][4/N] add time ordered store factory

2022-03-22 Thread GitBox


guozhangwang merged pull request #11892:
URL: https://github.com/apache/kafka/pull/11892


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-03-22 Thread GitBox


showuon commented on pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1075887120


   Oh, sorry, totally forgot about this PR. I'll take a look again this week. 
Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-03-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510997#comment-17510997
 ] 

Matthias J. Sax commented on KAFKA-7509:


Thanks for the detailed explanation. I don't know anything about connect code 
base, and just realizing it's even more complicated than for the Streams case 
with regard to nesting... I guess I don't have a good proposal how to address 
it. – In the end, it also seems a bigger problem for Connect than for Streams 
(for Streams, it falls into "annoying but not harmful" category, but it seem 
for Connect the noise ration is much higher...).

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mjsax commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-03-22 Thread GitBox


mjsax commented on pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1075878024


   Just stumbled across this PR -- should we try to get it into 3.2 release? 
Seems to be a valuable bug-fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13613) Kafka Connect has a hard dependency on KeyGenerator.HmacSHA256

2022-03-22 Thread Guy Pascarella (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510979#comment-17510979
 ] 

Guy Pascarella commented on KAFKA-13613:


[~ChrisEgerton] The issue is only observed for running. We tend to utilize 
pre-compiled binaries whenever possible. Thanks!

> Kafka Connect has a hard dependency on KeyGenerator.HmacSHA256
> --
>
> Key: KAFKA-13613
> URL: https://issues.apache.org/jira/browse/KAFKA-13613
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
> Environment: RHEL 8.5
> OpenJDK 1.8.0_312 or 11
> Confluent Platform 7.0.1 (Kafka 3.0.0)
>Reporter: Guy Pascarella
>Assignee: Chris Egerton
>Priority: Major
>
> If a server is running Java 8 that has been configured for FIPS mode 
> according to 
> [openjdk-8-configuring_openjdk_8_on_rhel_with_fips-en-us.pdf|https://access.redhat.com/documentation/en-us/openjdk/8/pdf/configuring_openjdk_8_on_rhel_with_fips/openjdk-8-configuring_openjdk_8_on_rhel_with_fips-en-us.pdf]
>  then the SunJCE provider is not available. As such the KeyGenerator 
> HmacSHA256 is not available. The KeyGenerators I see available are
>  * DES
>  * ARCFOUR
>  * AES
>  * DESede
> Out of these I think AES would be most appropriate, but that's not the point 
> of this issue, just including for completeness.
> When Kafka Connect is started in distributed mode on one of these servers I 
> see the following stack trace
> {noformat}
> [2022-01-20 20:36:30,027] ERROR Stopping due to error 
> (org.apache.kafka.connect.cli.ConnectDistributed)
> java.lang.ExceptionInInitializerError
> at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:94)
> at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:79)
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> HmacSHA256 for configuration inter.worker.key.generation.algorithm: 
> HmacSHA256 KeyGenerator not available
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedConfig.validateKeyAlgorithm(DistributedConfig.java:504)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedConfig.lambda$configDef$2(DistributedConfig.java:375)
> at 
> org.apache.kafka.common.config.ConfigDef$LambdaValidator.ensureValid(ConfigDef.java:1043)
> at 
> org.apache.kafka.common.config.ConfigDef$ConfigKey.(ConfigDef.java:1164)
> at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:152)
> at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:172)
> at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:211)
> at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:373)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedConfig.configDef(DistributedConfig.java:371)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedConfig.(DistributedConfig.java:196)
> ... 2 more
> {noformat}
> It appears the 
> {{org.apache.kafka.connect.runtime.distributed.DistributedConfig}} is 
> triggering a validation of the hard-coded default 
> {{inter.worker.key.generation.algorithm}} property, which is {{HmacSHA256}}.
> Ideally a fix would use the value from the configuration file before 
> attempting to validate a default value.
> Updates [2022/01/27]: I just tested on a FIPS-enabled version of OpenJDK 11 
> using the instructions at 
> [configuring_openjdk_11_on_rhel_with_fips|https://access.redhat.com/documentation/en-us/openjdk/11/html-single/configuring_openjdk_11_on_rhel_with_fips/index],
>  which resulted in the same issues. One workaround is to disable FIPS for 
> Kafka Connect by passing in the JVM parameter {{-Dcom.redhat.fips=false}}, 
> however, that means Kafka Connect and all the workers are out of compliance 
> for anyone required to use FIPS-enabled systems.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


showuon commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1075815779


   Thanks @kkonstantine !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11932: Revert "KAFKA-7077: Use default producer settings in Connect Worker"

2022-03-22 Thread GitBox


showuon commented on pull request #11932:
URL: https://github.com/apache/kafka/pull/11932#issuecomment-1075814026


   Thanks @kkonstantine !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #11933: KAFKA-13759: Disable idempotency by default in producers instantiated by Connect

2022-03-22 Thread GitBox


rhauch commented on a change in pull request #11933:
URL: https://github.com/apache/kafka/pull/11933#discussion_r832738114



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -648,6 +648,14 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
 // These settings will execute infinite retries on retriable 
exceptions. They *may* be overridden via configs passed to the worker,
 // but this may compromise the delivery guarantees of Kafka Connect.
 producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
Long.toString(Long.MAX_VALUE));
+// By default, producers that are instantiated and used by Connect 
have idempotency disabled even after idempotency became
+// default for Kafka producers. This is chosen to avoid breaking 
changes when Connect contacts Kafka brokers that do not support

Review comment:
   Maybe rephrase to be more positive? Like:
   ```suggestion
   // default for Kafka producers. This ensures Connect continues to 
work with many Kafka broker versions, including older brokers that do not 
support
   ```
   If you make a change like this, please change the other places to be 
consistent.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -648,6 +648,14 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
 // These settings will execute infinite retries on retriable 
exceptions. They *may* be overridden via configs passed to the worker,
 // but this may compromise the delivery guarantees of Kafka Connect.
 producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
Long.toString(Long.MAX_VALUE));
+// By default, producers that are instantiated and used by Connect 
have idempotency disabled even after idempotency became
+// default for Kafka producers. This is chosen to avoid breaking 
changes when Connect contacts Kafka brokers that do not support
+// idempotent producers or require explicit steps to enable them (e.g. 
adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
+// These settings might change when 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
+// gets approved and scheduled for release.
+producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");

Review comment:
   Didn't #11932 add these two lines back in? If so, the only change should 
be the addition of:
   ```
   producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
   ```
   Maybe this PR just needs to merge the latest from `trunk`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11932: Revert "KAFKA-7077: Use default producer settings in Connect Worker"

2022-03-22 Thread GitBox


kkonstantine commented on pull request #11932:
URL: https://github.com/apache/kafka/pull/11932#issuecomment-1075781001


   Cherry-picked on `3.2` cc @cadonna 
   (not present on the 3.1 or 3.0 branches)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #11932: Revert "KAFKA-7077: Use default producer settings in Connect Worker"

2022-03-22 Thread GitBox


kkonstantine merged pull request #11932:
URL: https://github.com/apache/kafka/pull/11932


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11932: Revert "KAFKA-7077: Use default producer settings in Connect Worker"

2022-03-22 Thread GitBox


kkonstantine commented on pull request #11932:
URL: https://github.com/apache/kafka/pull/11932#issuecomment-1075776386


   Thanks @rhauch! No Connect failures in the builds. Merging the PR as is. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine opened a new pull request #11933: KAFKA-13759: Disable idempotency by default in producers instantiated by Connect

2022-03-22 Thread GitBox


kkonstantine opened a new pull request #11933:
URL: https://github.com/apache/kafka/pull/11933


   Issued on top of https://github.com/apache/kafka/pull/11932 refers to 
https://issues.apache.org/jira/browse/KAFKA-13759
   
   Will add detailed description when I move from draft to regular PR. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13759) Disable producer idempotency by default in producers instantiated by Connect

2022-03-22 Thread Konstantine Karantasis (Jira)
Konstantine Karantasis created KAFKA-13759:
--

 Summary: Disable producer idempotency by default in producers 
instantiated by Connect
 Key: KAFKA-13759
 URL: https://issues.apache.org/jira/browse/KAFKA-13759
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Konstantine Karantasis
Assignee: Konstantine Karantasis


https://issues.apache.org/jira/browse/KAFKA-7077 was merged recently referring 
to KIP-318. Before that in AK 3.0 idempotency was enabled by default across 
Kafka producers. 

However, some compatibility implications were missed in both cases. 

If idempotency is enabled by default Connect won't be able to communicate via 
its producers with Kafka brokers older than version 0.11. Perhaps more 
importantly, for brokers older than version 2.8 the {{IDEMPOTENT_WRITE}} ACL is 
required to be granted to the principal of the Connect worker. 

Given the above caveats, this ticket proposes to explicitly disable producer 
idempotency in Connect by default. This feature, as it happens today, can be 
enabled by setting worker and/or connector properties. However, enabling it by 
default should be considered in a major version upgrade and after KIP-318 is 
updated to mention the compatibility requirements and gets officially approved. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13758) Exclusive locking in kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:227

2022-03-22 Thread Sree Vaddi (Jira)
Sree Vaddi created KAFKA-13758:
--

 Summary: Exclusive locking in 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:227
 Key: KAFKA-13758
 URL: https://issues.apache.org/jira/browse/KAFKA-13758
 Project: Kafka
  Issue Type: Bug
 Environment: Azure

Reporter: Sree Vaddi


{quote} [2022-03-22 15:15:03,127] *ERROR* [GroupMetadataManager brokerId=#] 
Appending metadata message for group ... failed due to unexpected *error:* 
_org.apache.kafka.common.errors.UnknownServerException_ 
(kafka.coordinator.group.GroupMetadataManager){quote}




Probably Issues is here: CoreUtils.scala inLock()
  /**
   * Execute the given function inside the lock
   */
  def inLock[T](lock: Lock)(fun: => T): T = {
    lock.lock()
    try {
      fun
    } finally {
      lock.unlock()
    }
  }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] rhauch commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


rhauch commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1075711614


   Sounds good to me, too. Thanks, @kkonstantine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine opened a new pull request #11932: Revert "KAFKA-7077: Use default producer settings in Connect Worker"

2022-03-22 Thread GitBox


kkonstantine opened a new pull request #11932:
URL: https://github.com/apache/kafka/pull/11932


   Reverts apache/kafka#11475
   
   Connect already allows users to enable idempotent producers for connectors 
and the Connect workers. Although Kafka producers enabled idempotency by 
default in 3.0, due to compatibility requirements and the fact that 
[KIP-318](https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent)
 hasn't been explicitly approved, the changes here are reverted. A separate 
commit will explicitly disable idempotency in producers instantiated by Connect 
by default until KIP-318 is approved and scheduled for release. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


ijuma commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1075696440


   That sounds good to me. Let's make sure we include these changes in 3.0.x 
and 3.1.x as well. For the latter, we still have time to make it for 3.1.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


kkonstantine commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1075693990


   @ismael you're bringing up a good point, which I missed in your latest 
comment before I send my recent reply. 
   
   In light of the requirement to explicitly add the IDEMPOTENT_WRITE ACL for 
Connect workers when talking to Kafka brokers older 2.8 (which I wasn't aware 
of), I'd like to suggest changing course here as follows: 
   
   * Revert KAFKA-7077 from all the branches that has been merged. 
   * Return to 
[KIP-318](https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent),
 update it and actually vote for it. @LiamClarkeNZ you referred to this KIP in 
the description of this PR but I missed that this KIP hasn't been approved and 
is actually currently marked as inactive. I think we should raise it again 
after we update it to include all the compatibility requirements and have it 
target the next major version (4.0). 
   * Issue a new PR that will explicitly disable idempotency by default in 
Connect and will allow users to override the setting via the worker and/or the 
connector configs like we allow it today. 
   * In the same PR, update our docs to say that despite the Kafka producer 
enabling idempotency by default in 3.0, due to compatibility requirements 
Connect chooses to disable idempotency for all the producers that instantiates 
by default. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10967: KAFKA-12770: CheckStyle team needs this feature (in order to include Kafka into their regression suite)

2022-03-22 Thread GitBox


dejan2609 commented on pull request #10967:
URL: https://github.com/apache/kafka/pull/10967#issuecomment-1075573176


   **Note**: rebased and force-pushed (in order to resolve conflicts).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine edited a comment on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


kkonstantine edited a comment on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1075479063


   @showuon I noticed that you recently disable idempotency for the status 
backing store. Now that we remove `acks=all` should we add it there? Is this 
something we missed with the changes here? And taking an even closer look at 
the status backing store implementation, I see that retries are handled by that 
class. So we disable idempotence but are we sure 
`max.in.flight.requests.per.connection` is 1 there?
   
   Also, re:
   > @kkonstantine , thanks for the comment. I agree we add comments in the 
code to say the idempotent producer is enabled by default.
   
   my intention was actually to add a comment in the code to say what are the 
configs that we definitely require for the producers in Connect to work as 
intended. And these are `acks=all`, `max.in.flight.requests.per.connection=1` 
unless idempotency is enabled given that we have KAFKA-5494 and infinite 
retries. These requirements were reflected in actual code until this PR was 
merged. But now they are implied by the defaults of the idempotent producer and 
that's something I'd like to leave a trail of, so we know what works for 
Connect. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


kkonstantine commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1075479063


   @showuon I noticed that you recently disable idempotency for the status 
backing store. Now that we remove `acks=all` should we add it there? Is this 
something we missed with the changes here? And taking an even closer look at 
the status backing store implementation, I see that retries are handled by that 
class. So we disable idempotence but are we sure 
`max.in.flight.requests.per.connection` is 1 there?
   
   Also, re:
   > @kkonstantine , thanks for the comment. I agree we add comments in the 
code to say the idempotent producer is enabled by default.
   
   my intention was actually to add a comment in the code to say what are the 
configs that we definitely require for the producers in Connect to work as 
intended. And these are `acks=all`, `max.in.flight.requests.per.connection=1` 
unless idempotency is enabled given that we have KAFKA-5494 and infinite 
retries. These requirements were reflected in actual code until this PR was 
merged. But now they are implied by the defaults of the idempotent producer and 
that's something I'd like to leave a trail of, so we know what works for 
Connect. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley merged pull request #11931: Fix LICENSE-binary

2022-03-22 Thread GitBox


tombentley merged pull request #11931:
URL: https://github.com/apache/kafka/pull/11931


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7373) GetOffsetShell doesn't work when SSL authentication is enabled

2022-03-22 Thread Konstantinos Papalias (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510798#comment-17510798
 ] 

Konstantinos Papalias commented on KAFKA-7373:
--

hi [~enether] [~kiwiandy] it seems that this issue still exists, any plans to 
merge the fix for this issue ? 

> GetOffsetShell doesn't work when SSL authentication is enabled
> --
>
> Key: KAFKA-7373
> URL: https://issues.apache.org/jira/browse/KAFKA-7373
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Andy Bryant
>Priority: Major
>
> GetOffsetShell doesn't provide a mechanism to provide additional 
> configuration for the underlying KafkaConsumer as does the `ConsoleConsumer`. 
> Passing SSL config as system properties doesn't propagate to the consumer 
> either.
> {code:java}
> 10:47 $ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> ${BROKER_LIST} --topic cld-dev-sor-crods-crodsdba_contact
> Exception in thread "main" org.apache.kafka.common.errors.TimeoutException: 
> Timeout expired while fetching topic metadata{code}
> Editing {{GetOffsetShell.scala}} to include the SSL properties in the 
> KafkaConsumer configuration resolved the issue.
> Providing {{consumer-property}} and {{consumer-config}} configuration options 
> for {{kafka-run-class-sh}} or creating a separate run script for offsets and 
> using these properties in {{GetOffsetShell.scala}} seems like a good solution.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-22 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510767#comment-17510767
 ] 

Guozhang Wang edited comment on KAFKA-13689 at 3/22/22, 5:14 PM:
-

Okay I think we are really on the same page here, just are using different 
terminologies :) Where I'm from is the Streams usage, e.g. let's say we create 
a StreamsConfig object from:

{code}
StreamsConfig config = new StreamsConfig(map);
{code}

If we call

{code}
config.get(PREDEFINED_CONFIG_NAME)
{code}

then the param must be a defined config name, otherwise it will throw;

If we call

{code}
config.originals().get(ANY_CONFIG_NAME)
{code}

then it tries to get from the underlying maps directly, and any config names 
including the custom unknown configs can be retrieved. So that's how custom 
configs are retrieved (like you said, some modules use `(config.originals() 
UNION config.values()) .get(ANY_CONFIG_NAME).` as well).

Now, I realize since `logUnused` is triggered at the construction of the 
consumer/producer clients, whereas those embedded module's custom configs are 
possibly not yet retrieved as they will only be constructed and initialized at 
a later time, in which case `used` set would not yet contain any of those 
unknown configs yet. As a result:

1) all defined, used configs should be included in `used` since they are 
retrieved via the first call above. This is the case we want to guarantee and 
WARN if not since it may indicates a bug.
2) all defined, but disabled configs are included in `used` since they are 
called via `config.ignore()`. This is what we want to fix in this JIRA.
3) all unknown configs may or may not be included in `used` and that's out of 
the AbstractConfig object's control.

So by doing the above two, we can fix this JIRA ticket which is case 2), but 
the `logUnused` could still contain case 1) or case 3) whereas we really only 
want to WARN on case 1) above. The latter issue is a bit out of the scope of 
this JIRA, hence I said just doing the above two is sufficient for this issue.



If we want to resolve the second as well in this JIRA, I'm thinking we can do 
sth. aligned with your proposal in the description as "AbstractConfig provides 
two new methods: logUnknown() and unknown()" but to avoid creating a KIP for 
adding public APIs, we can just do them inside the existing `logUnused`, as 
something like:

{code}
public Set unused() {
Set keys = new HashSet<>(values.keySet());   // here we take 
the diff between values and used.
keys.removeAll(used);
return keys;
}

// keep it private
private Set unknown() {
Set keys = new HashSet<>(originals.keySet());   // here we take 
the diff between originals and values.
keys.removeAll(values);
return keys;
}

public void logUnused() {
Set unusedkeys = unused(); 
for (String key : unused())
log.warn("The configuration '{}' was supplied but isn't a used.", key); 
// here we still log one line per config as a WARN

Set unusedkeys = unknown(); 
if (!unknown.isEmpty()) {
log.info("These configurations '{}' were not known.", unusedkeys);  // 
here we log one line for all configs as INFO.
}
} 
{code}



was (Author: guozhang):
Okay I think we are really on the same page here, just are using different 
terminologies :) Where I'm from is the Streams usage, e.g. let's say we create 
a StreamsConfig object from:

```
StreamsConfig config = new StreamsConfig(map);
```

If we call

```
config.get(PREDEFINED_CONFIG_NAME)
```

then the param must be a defined config name, otherwise it will throw;

If we call

```
config.originals().get(ANY_CONFIG_NAME)
```

then it tries to get from the underlying maps directly, and any config names 
including the custom unknown configs can be retrieved. So that's how custom 
configs are retrieved (like you said, some modules use `(config.originals() 
UNION config.values()) .get(ANY_CONFIG_NAME).` as well).

Now, I realize since `logUnused` is triggered at the construction of the 
consumer/producer clients, whereas those embedded module's custom configs are 
possibly not yet retrieved as they will only be constructed and initialized at 
a later time, in which case `used` set would not yet contain any of those 
unknown configs yet. As a result:

1) all defined, used configs should be included in `used` since they are 
retrieved via the first call above. This is the case we want to guarantee and 
WARN if not since it may indicates a bug.
2) all defined, but disabled configs are included in `used` since they are 
called via `config.ignore()`. This is what we want to fix in this JIRA.
3) all unknown configs may or may not be included in `used` and that's out of 
the AbstractConfig object's control.

So by doing the above two, we can fix this JIRA ticket which is case 2), but 
the 

[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-22 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510767#comment-17510767
 ] 

Guozhang Wang commented on KAFKA-13689:
---

Okay I think we are really on the same page here, just are using different 
terminologies :) Where I'm from is the Streams usage, e.g. let's say we create 
a StreamsConfig object from:

```
StreamsConfig config = new StreamsConfig(map);
```

If we call

```
config.get(PREDEFINED_CONFIG_NAME)
```

then the param must be a defined config name, otherwise it will throw;

If we call

```
config.originals().get(ANY_CONFIG_NAME)
```

then it tries to get from the underlying maps directly, and any config names 
including the custom unknown configs can be retrieved. So that's how custom 
configs are retrieved (like you said, some modules use `(config.originals() 
UNION config.values()) .get(ANY_CONFIG_NAME).` as well).

Now, I realize since `logUnused` is triggered at the construction of the 
consumer/producer clients, whereas those embedded module's custom configs are 
possibly not yet retrieved as they will only be constructed and initialized at 
a later time, in which case `used` set would not yet contain any of those 
unknown configs yet. As a result:

1) all defined, used configs should be included in `used` since they are 
retrieved via the first call above. This is the case we want to guarantee and 
WARN if not since it may indicates a bug.
2) all defined, but disabled configs are included in `used` since they are 
called via `config.ignore()`. This is what we want to fix in this JIRA.
3) all unknown configs may or may not be included in `used` and that's out of 
the AbstractConfig object's control.

So by doing the above two, we can fix this JIRA ticket which is case 2), but 
the `logUnused` could still contain case 1) or case 3) whereas we really only 
want to WARN on case 1) above. The latter issue is a bit out of the scope of 
this JIRA, hence I said just doing the above two is sufficient for this issue.



If we want to resolve the second as well in this JIRA, I'm thinking we can do 
sth. aligned with your proposal in the description as "AbstractConfig provides 
two new methods: logUnknown() and unknown()" but to avoid creating a KIP for 
adding public APIs, we can just do them inside the existing `logUnused`, as 
something like:

```
public Set unused() {
Set keys = new HashSet<>(values.keySet());   // here we take 
the diff between values and used.
keys.removeAll(used);
return keys;
}

// keep it private
private Set unknown() {
Set keys = new HashSet<>(originals.keySet());   // here we take 
the diff between originals and values.
keys.removeAll(values);
return keys;
}

public void logUnused() {
Set unusedkeys = unused(); 
for (String key : unused())
log.warn("The configuration '{}' was supplied but isn't a used.", key); 
// here we still log one line per config as a WARN

Set unusedkeys = unknown(); 
if (!unknown.isEmpty()) {
log.info("These configurations '{}' were not known.", unusedkeys);  // 
here we log one line for all configs as INFO.
}
} 
```

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the 

[GitHub] [kafka] tombentley commented on pull request #11931: Fix LICENSE-binary

2022-03-22 Thread GitBox


tombentley commented on pull request #11931:
URL: https://github.com/apache/kafka/pull/11931#issuecomment-1075319099


   @mimaison PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley merged pull request #11922: Update upgrade.html for 3.1.1

2022-03-22 Thread GitBox


tombentley merged pull request #11922:
URL: https://github.com/apache/kafka/pull/11922


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-03-22 Thread GitBox


rondagostino commented on pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#issuecomment-1075280621


   Test passes with this change, but I don't understand why.
   `git diff 
log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java`
   ```
   diff --git 
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
 
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
   index 23272a2cb5..edaf6589a7 100644
   --- 
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
   +++ 
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
   @@ -43,6 +43,7 @@ import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
import static 
org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
import static 
org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
   +import static 
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static 
org.apache.kafka.common.config.SaslConfigs.SASL_KERBEROS_SERVICE_NAME;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
   @@ -332,6 +333,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton 
{
   
props.put(KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
   +props.put(ENABLE_IDEMPOTENCE_CONFIG, false);
this.producer = getKafkaProducer(props);
LogLog.debug("Kafka producer connected to " + brokerList);
LogLog.debug("Logging for topic: " + topic);
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-03-22 Thread GitBox


mimaison commented on a change in pull request #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r832274976



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -169,6 +172,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs

Review comment:
   I've looked at this more closely. 
   
   MirrorSourceConnector correctly prevents loops but it's not the case for 
MirrorCheckpointConnector. It's not clear why they have different 
`shouldReplicateTopic()` methods. I don't understand the point of this 
behavior, surely it must be a bug. I don't see a use in having untranslated 
checkpoints from remote offsets. 
   
   If we agree it's unexpected behavior and can't see why anybody would rely on 
this, I think it's ok to remove it. Then this would allow us to do some 
cleanups, ie update `renameTopicPartition()`, have a single 
`shouldReplicateTopic()` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Nayana-ibm commented on pull request #11929: MINOR: s390x Stage

2022-03-22 Thread GitBox


Nayana-ibm commented on pull request #11929:
URL: https://github.com/apache/kafka/pull/11929#issuecomment-1075247867


   @mimaison s390x build fails with 
   `ERROR: JAVA_HOME is set to an invalid directory: 
/home/jenkins/tools/java/latest11`
   
   We have installed java on nodes at 
/usr/lib/jvm/java-11-openjdk-s390x/bin/java . In this case, we shouldn't be 
using `{  jdk 'jdk_11_latest'  } `for s390x? Please let me know


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-03-22 Thread GitBox


rondagostino commented on pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#issuecomment-1075205852


   Here is `kafka_log4j_appender.log`.
   
   ```
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/core/build/dependant-libs-2.13.6/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/tools/build/dependant-libs-2.13.6/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/trogdor/build/dependant-libs-2.13.6/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/connect/api/build/dependant-libs/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/connect/file/build/dependant-libs/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/connect/json/build/dependant-libs/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/opt/kafka-dev/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.32.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.
   SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
   log4j:WARN No appenders could be found for logger 
(org.apache.kafka.clients.producer.ProducerConfig).
   log4j:WARN Please initialize the log4j system properly.
   log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for 
more info.
   
   ```
   
   Note this line, which seems suspicious (though I am unfamiliar with this 
tools/what it does/how it does it, so maybe it is irrelevant).
   
   ```
   log4j:WARN No appenders could be found for logger 
(org.apache.kafka.clients.producer.ProducerConfig).
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 edited a comment on pull request #11930: MINOR: Improve the readability of the DelegationTokenManager code

2022-03-22 Thread GitBox


RivenSun2 edited a comment on pull request #11930:
URL: https://github.com/apache/kafka/pull/11930#issuecomment-1075146147


   Hi @guozhangwang @showuon @dajac 
   This is a small change to improve the readability of the source code.
   Please help to review the PR .
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13757) Improve the annotations of all related methods of DelegationToken in the Admin class

2022-03-22 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510492#comment-17510492
 ] 

RivenSun commented on KAFKA-13757:
--

Hi [~omkreddy]  [~guozhang] and  [~showuon] 
Could you give some advice?
Thanks.

> Improve the annotations of all related methods of DelegationToken in the 
> Admin class
> 
>
> Key: KAFKA-13757
> URL: https://issues.apache.org/jira/browse/KAFKA-13757
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: RivenSun
>Priority: Major
>
> DelegationToken is a great and lightweight feature, but when users actually 
> use it, they get confused.
> From the existing official documents/comments on methods/comments on method 
> parameters, the user cannot know what is the specific processing logic of the 
> server and what is the meaning of the returned fields after he calls the 
> XXXDelegationToken(...) method.
> After reading the source code, I briefly sorted out the processing logic of 
> the XXXDelegationToken(...) method on the server side.
> 1. createDelegationToken:
> {code:java}
> // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
> authenticated channels,
> // throw UnsupportedByAuthenticationException
> // 2. if the delegation token feature is disabled, throw 
> DelegationTokenDisabledException
> // 3. if the renewers principal type is not KafkaPrincipal.USER_TYPE, throw 
> InvalidPrincipalTypeException
> // 4. if the request was not completed in within the given timeoutMs(), throw 
> TimeoutException
> //processing logic:
> //   maxLifeTime = `maxLifeTimeMs` <= 0 ? 
> brokerConfig.delegationTokenMaxLifeMs : Math.min(`maxLifeTimeMs`, 
> brokerConfig.delegationTokenMaxLifeMs)
> //   maxLifeTimestamp = currentTimeMillis + maxLifeTime
> //   expiryTimestamp = Math.min(maxLifeTimestamp, currentTimeMillis + 
> brokerConfig.delegationTokenExpiryTimeMs)
> //   update tokenInfo and return createTokenResult {code}
> 2. renewDelegationToken
> {code:java}
> // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
> authenticated channels,
> // throw UnsupportedByAuthenticationException
> // 2. if the delegation token feature is disabled, throw 
> DelegationTokenDisabledException
> // 3. if the authenticated user is not owner/renewer of the token, throw 
> DelegationTokenOwnerMismatchException
> // 4. if the delegation token is expired, throw 
> DelegationTokenExpiredException
> // 5. if the delegation token is not found on server, throw 
> DelegationTokenNotFoundException
> // 6. if the request was not completed in within the given timeoutMs(), throw 
> TimeoutException
> //processing logic:
> //renewLifeTime = `renewTimePeriodMs` < 0 ? 
> brokerConfig.delegationTokenExpiryTimeMs : `renewTimePeriodMs`
> //renewTimestamp = currentTimeMillis + renewLifeTime
> //expiryTimestamp = Math.min(tokenInfo.maxTimestamp, renewTimestamp)
> //update tokenInfo.expiryTimestamp
> //return expiryTimestamp {code}
> 3. expireDelegationToken
> {code:java}
> // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
> authenticated channels,
> // throw UnsupportedByAuthenticationException
> // 2. if the delegation token feature is disabled, throw 
> DelegationTokenDisabledException
> // 3. if the authenticated user is not owner/renewer of the token, throw 
> DelegationTokenOwnerMismatchException
> // 4. if the delegation token is expired, throw 
> DelegationTokenExpiredException
> // 5. if the delegation token is not found on server, throw 
> DelegationTokenNotFoundException
> // 6. if the request was not completed in within the given timeoutMs(), throw 
> TimeoutException
> //processing logic:
> //if `expiryTimePeriodMs` < 0, delete tokenInfo immediately, return 
> currentTimeMillis.
> //otherwise update tokenInfo expiryTimestamp:
> //  expiryTimestamp = Math.min(tokenInfo.maxTimestamp, 
> currentTimeMillis + `expiryTimePeriodMs`)
> //  update tokenInfo.expiryTimestamp
> //  return expiryTimestamp
> //
> //Note: Tokens can be cancelled explicitly. If a token is not renewed by 
> the token’s expiration time or if token is
> //beyond the max life time, it will also be deleted from all broker 
> caches as well as from zookeeper. {code}
> 4. describeDelegationToken
> {code:java}
> // 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
> authenticated channels,
> // throw UnsupportedByAuthenticationException
> // 2. if the delegation token feature is disabled, throw 
> DelegationTokenDisabledException
> // 3. if the request was not completed in within the given timeoutMs(), throw 
> TimeoutException
> //processing logic:
> //if `owners` is EmptyList(note: 

[GitHub] [kafka] ijuma edited a comment on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


ijuma edited a comment on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1075173105


   @kkonstantine What is the expected broker version compatibility for connect? 
If it's older than 0.11, then we cannot rely on idempotence.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13757) Improve the annotations of all related methods of DelegationToken in the Admin class

2022-03-22 Thread RivenSun (Jira)
RivenSun created KAFKA-13757:


 Summary: Improve the annotations of all related methods of 
DelegationToken in the Admin class
 Key: KAFKA-13757
 URL: https://issues.apache.org/jira/browse/KAFKA-13757
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Reporter: RivenSun


DelegationToken is a great and lightweight feature, but when users actually use 
it, they get confused.
>From the existing official documents/comments on methods/comments on method 
>parameters, the user cannot know what is the specific processing logic of the 
>server and what is the meaning of the returned fields after he calls the 
>XXXDelegationToken(...) method.

After reading the source code, I briefly sorted out the processing logic of the 
XXXDelegationToken(...) method on the server side.

1. createDelegationToken:
{code:java}
// 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
authenticated channels,
// throw UnsupportedByAuthenticationException
// 2. if the delegation token feature is disabled, throw 
DelegationTokenDisabledException
// 3. if the renewers principal type is not KafkaPrincipal.USER_TYPE, throw 
InvalidPrincipalTypeException
// 4. if the request was not completed in within the given timeoutMs(), throw 
TimeoutException

//processing logic:
//   maxLifeTime = `maxLifeTimeMs` <= 0 ? brokerConfig.delegationTokenMaxLifeMs 
: Math.min(`maxLifeTimeMs`, brokerConfig.delegationTokenMaxLifeMs)
//   maxLifeTimestamp = currentTimeMillis + maxLifeTime
//   expiryTimestamp = Math.min(maxLifeTimestamp, currentTimeMillis + 
brokerConfig.delegationTokenExpiryTimeMs)
//   update tokenInfo and return createTokenResult {code}

2. renewDelegationToken
{code:java}
// 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
authenticated channels,
// throw UnsupportedByAuthenticationException
// 2. if the delegation token feature is disabled, throw 
DelegationTokenDisabledException
// 3. if the authenticated user is not owner/renewer of the token, throw 
DelegationTokenOwnerMismatchException
// 4. if the delegation token is expired, throw DelegationTokenExpiredException
// 5. if the delegation token is not found on server, throw 
DelegationTokenNotFoundException
// 6. if the request was not completed in within the given timeoutMs(), throw 
TimeoutException

//processing logic:
//renewLifeTime = `renewTimePeriodMs` < 0 ? 
brokerConfig.delegationTokenExpiryTimeMs : `renewTimePeriodMs`
//renewTimestamp = currentTimeMillis + renewLifeTime
//expiryTimestamp = Math.min(tokenInfo.maxTimestamp, renewTimestamp)
//update tokenInfo.expiryTimestamp
//return expiryTimestamp {code}
3. expireDelegationToken
{code:java}
// 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
authenticated channels,
// throw UnsupportedByAuthenticationException
// 2. if the delegation token feature is disabled, throw 
DelegationTokenDisabledException
// 3. if the authenticated user is not owner/renewer of the token, throw 
DelegationTokenOwnerMismatchException
// 4. if the delegation token is expired, throw DelegationTokenExpiredException
// 5. if the delegation token is not found on server, throw 
DelegationTokenNotFoundException
// 6. if the request was not completed in within the given timeoutMs(), throw 
TimeoutException

//processing logic:
//if `expiryTimePeriodMs` < 0, delete tokenInfo immediately, return 
currentTimeMillis.
//otherwise update tokenInfo expiryTimestamp:
//  expiryTimestamp = Math.min(tokenInfo.maxTimestamp, 
currentTimeMillis + `expiryTimePeriodMs`)
//  update tokenInfo.expiryTimestamp
//  return expiryTimestamp
//
//Note: Tokens can be cancelled explicitly. If a token is not renewed by 
the token’s expiration time or if token is
//beyond the max life time, it will also be deleted from all broker caches 
as well as from zookeeper. {code}
4. describeDelegationToken
{code:java}
// 1. if the request sent on PLAINTEXT/1-way SSL channels or delegation token 
authenticated channels,
// throw UnsupportedByAuthenticationException
// 2. if the delegation token feature is disabled, throw 
DelegationTokenDisabledException
// 3. if the request was not completed in within the given timeoutMs(), throw 
TimeoutException

//processing logic:
//if `owners` is EmptyList(note: exclude `null`), return List() immediately.
//otherwise return all tokens that satisfies any of the following 
conditions:
//  1) if `owners` size > 0, at least one element in `owners` is 
token.ownerOrRenewer
//  2) the authenticated user is token.ownerOrRenewer
//  3) the authenticated user has `DESCRIBE` permission on `Token` 
Resource
//  for non-owned tokens {code}
I think we can add some comments on the XXXDelegationToken method: how the 
server handles the 

[GitHub] [kafka] ijuma commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


ijuma commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1075178283


   One other thing: the 
[IDEMPOTENT_WRITE](https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default)
 ACL is required for the Connect worker principal if the kafka cluster is older 
than 2.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


ijuma edited a comment on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1075173105


   @kkonstantine A related question in this other PR 
https://github.com/confluentinc/ce-kafka/pull/5905#issuecomment-1075167825 that 
also applies to this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


ijuma commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1075173105


   @kkonstantine A related question in this other PR 
https://github.com/confluentinc/ce-kafka/pull/5905#issuecomment-1075167825


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-03-22 Thread GitBox


showuon commented on pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#issuecomment-1075154439


   @rondagostino , could I get the error logs? Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11930: MINOR: Improve the readability of the DelegationTokenManager code

2022-03-22 Thread GitBox


RivenSun2 commented on pull request #11930:
URL: https://github.com/apache/kafka/pull/11930#issuecomment-1075146147


   Hi @guozhangwang @showuon 
   This is a small change to improve the readability of the source code.
   Please help to review the PR .
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 opened a new pull request #11930: MINOR: Improve the readability of the DelegationTokenManager code

2022-03-22 Thread GitBox


RivenSun2 opened a new pull request #11930:
URL: https://github.com/apache/kafka/pull/11930


   Improve the readability of the DelegationTokenManager code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11924: MINOR: fix typos in TransactionManager.java

2022-03-22 Thread GitBox


dajac merged pull request #11924:
URL: https://github.com/apache/kafka/pull/11924


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-03-22 Thread GitBox


rondagostino commented on pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#issuecomment-1075137398


   @showuon It still fails at that commit.
   `git checkout 7c280c1d5f6267acbbebd10d3e58ea4b8fe7a4ef`
   `HEAD is now at 7c280c1d5f KAFKA-13673: disable idempotence when config 
conflicts (#11788)`
   `./gradlew clean`
   `_DUCKTAPE_OPTIONS="--debug" 
TC_PATHS="tests/kafkatest/tests/tools/log4j_appender_test.py" bash 
tests/docker/run_tests.sh`
   **Test fails**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13457) SocketChannel in Acceptor#accept is not closed upon IOException

2022-03-22 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510472#comment-17510472
 ] 

Mickael Maison commented on KAFKA-13457:


Thanks!

> SocketChannel in Acceptor#accept is not closed upon IOException
> ---
>
> Key: KAFKA-13457
> URL: https://issues.apache.org/jira/browse/KAFKA-13457
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.8.0
>Reporter: Haoze Wu
>Priority: Major
> Fix For: 3.2.0
>
>
> When the kafka.network.Acceptor in SocketServer.scala accepts a new 
> connection in the `accept` function, it handles the 
> `TooManyConnectionsException` and `ConnectionThrottledException`. However, 
> the socketChannel operations (line 720 or 721 or 722) within the try block 
> may potentially throw an IOException as well, which is not handled.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> // Acceptor class
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()     // line 717
>     try {
>       connectionQuotas.inc(endPoint.listenerName, 
> socketChannel.socket.getInetAddress, blockedPercentMeter)
>       socketChannel.configureBlocking(false)             // line 720
>       socketChannel.socket().setTcpNoDelay(true)         // line 721
>       socketChannel.socket().setKeepAlive(true)          // line 722
>       if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
>         socketChannel.socket().setSendBufferSize(sendBufferSize)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>       
>         info(s"Rejected connection from ${e.ip}, address already has the 
> configured maximum of ${e.count} connections.")
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException => 
>         val ip = socketChannel.socket.getInetAddress
>         debug(s"Delaying closing of connection from $ip for 
> ${e.throttleTimeMs} ms")
>         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
>         throttledSockets += DelayedCloseSocket(socketChannel, 
> endThrottleTimeMs)
>         None
>     }
>   }
> {code}
> This thrown IOException is caught in the caller `acceptNewConnections` in 
> line 706, which only prints an error message. The socketChannel that throws 
> this IOException is not closed.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
>   private def acceptNewConnections(): Unit = {
>     val ready = nioSelector.select(500)
>     if (ready > 0) {
>       val keys = nioSelector.selectedKeys()
>       val iter = keys.iterator()
>       while (iter.hasNext && isRunning) {
>         try {
>           val key = iter.next
>           iter.remove()          if (key.isAcceptable) {
>             accept(key).foreach { socketChannel => 
>                 ...
>               } while (!assignNewConnection(socketChannel, processor, 
> retriesLeft == 0))
>             }
>           } else
>             throw new IllegalStateException("Unrecognized key state for 
> acceptor thread.")
>         } catch {
>           case e: Throwable => error("Error while accepting connection", e)   
> // line 706
>         }
>       }
>     }
>   }
> {code}
> We found during testing this would cause our Kafka clients to experience 
> errors (InvalidReplicationFactorException) for 40+ seconds when creating new 
> topics. After 40 seconds, the clients would be able to create new topics 
> successfully.
> We check that after adding the socketChannel.close() upon IOException, the 
> symptoms will disappear, so the clients do not need to wait for 40s to be 
> working again.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] fvaleri edited a comment on pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-03-22 Thread GitBox


fvaleri edited a comment on pull request #11748:
URL: https://github.com/apache/kafka/pull/11748#issuecomment-1075130682


   Thanks for the confirmation @mimaison. Then, it's +1 for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fvaleri commented on pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-03-22 Thread GitBox


fvaleri commented on pull request #11748:
URL: https://github.com/apache/kafka/pull/11748#issuecomment-1075130682


   Thanks for the confirmation @mimaison. 
   
   Then, it's +1 for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13457) SocketChannel in Acceptor#accept is not closed upon IOException

2022-03-22 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510466#comment-17510466
 ] 

David Jacot commented on KAFKA-13457:
-

[~mimaison] Done.

> SocketChannel in Acceptor#accept is not closed upon IOException
> ---
>
> Key: KAFKA-13457
> URL: https://issues.apache.org/jira/browse/KAFKA-13457
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.8.0
>Reporter: Haoze Wu
>Priority: Major
> Fix For: 3.2.0
>
>
> When the kafka.network.Acceptor in SocketServer.scala accepts a new 
> connection in the `accept` function, it handles the 
> `TooManyConnectionsException` and `ConnectionThrottledException`. However, 
> the socketChannel operations (line 720 or 721 or 722) within the try block 
> may potentially throw an IOException as well, which is not handled.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> // Acceptor class
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()     // line 717
>     try {
>       connectionQuotas.inc(endPoint.listenerName, 
> socketChannel.socket.getInetAddress, blockedPercentMeter)
>       socketChannel.configureBlocking(false)             // line 720
>       socketChannel.socket().setTcpNoDelay(true)         // line 721
>       socketChannel.socket().setKeepAlive(true)          // line 722
>       if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
>         socketChannel.socket().setSendBufferSize(sendBufferSize)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>       
>         info(s"Rejected connection from ${e.ip}, address already has the 
> configured maximum of ${e.count} connections.")
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException => 
>         val ip = socketChannel.socket.getInetAddress
>         debug(s"Delaying closing of connection from $ip for 
> ${e.throttleTimeMs} ms")
>         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
>         throttledSockets += DelayedCloseSocket(socketChannel, 
> endThrottleTimeMs)
>         None
>     }
>   }
> {code}
> This thrown IOException is caught in the caller `acceptNewConnections` in 
> line 706, which only prints an error message. The socketChannel that throws 
> this IOException is not closed.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
>   private def acceptNewConnections(): Unit = {
>     val ready = nioSelector.select(500)
>     if (ready > 0) {
>       val keys = nioSelector.selectedKeys()
>       val iter = keys.iterator()
>       while (iter.hasNext && isRunning) {
>         try {
>           val key = iter.next
>           iter.remove()          if (key.isAcceptable) {
>             accept(key).foreach { socketChannel => 
>                 ...
>               } while (!assignNewConnection(socketChannel, processor, 
> retriesLeft == 0))
>             }
>           } else
>             throw new IllegalStateException("Unrecognized key state for 
> acceptor thread.")
>         } catch {
>           case e: Throwable => error("Error while accepting connection", e)   
> // line 706
>         }
>       }
>     }
>   }
> {code}
> We found during testing this would cause our Kafka clients to experience 
> errors (InvalidReplicationFactorException) for 40+ seconds when creating new 
> topics. After 40 seconds, the clients would be able to create new topics 
> successfully.
> We check that after adding the socketChannel.close() upon IOException, the 
> symptoms will disappear, so the clients do not need to wait for 40s to be 
> working again.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (KAFKA-13457) SocketChannel in Acceptor#accept is not closed upon IOException

2022-03-22 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot closed KAFKA-13457.
---

> SocketChannel in Acceptor#accept is not closed upon IOException
> ---
>
> Key: KAFKA-13457
> URL: https://issues.apache.org/jira/browse/KAFKA-13457
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.8.0
>Reporter: Haoze Wu
>Priority: Major
> Fix For: 3.2.0
>
>
> When the kafka.network.Acceptor in SocketServer.scala accepts a new 
> connection in the `accept` function, it handles the 
> `TooManyConnectionsException` and `ConnectionThrottledException`. However, 
> the socketChannel operations (line 720 or 721 or 722) within the try block 
> may potentially throw an IOException as well, which is not handled.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> // Acceptor class
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()     // line 717
>     try {
>       connectionQuotas.inc(endPoint.listenerName, 
> socketChannel.socket.getInetAddress, blockedPercentMeter)
>       socketChannel.configureBlocking(false)             // line 720
>       socketChannel.socket().setTcpNoDelay(true)         // line 721
>       socketChannel.socket().setKeepAlive(true)          // line 722
>       if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
>         socketChannel.socket().setSendBufferSize(sendBufferSize)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>       
>         info(s"Rejected connection from ${e.ip}, address already has the 
> configured maximum of ${e.count} connections.")
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException => 
>         val ip = socketChannel.socket.getInetAddress
>         debug(s"Delaying closing of connection from $ip for 
> ${e.throttleTimeMs} ms")
>         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
>         throttledSockets += DelayedCloseSocket(socketChannel, 
> endThrottleTimeMs)
>         None
>     }
>   }
> {code}
> This thrown IOException is caught in the caller `acceptNewConnections` in 
> line 706, which only prints an error message. The socketChannel that throws 
> this IOException is not closed.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
>   private def acceptNewConnections(): Unit = {
>     val ready = nioSelector.select(500)
>     if (ready > 0) {
>       val keys = nioSelector.selectedKeys()
>       val iter = keys.iterator()
>       while (iter.hasNext && isRunning) {
>         try {
>           val key = iter.next
>           iter.remove()          if (key.isAcceptable) {
>             accept(key).foreach { socketChannel => 
>                 ...
>               } while (!assignNewConnection(socketChannel, processor, 
> retriesLeft == 0))
>             }
>           } else
>             throw new IllegalStateException("Unrecognized key state for 
> acceptor thread.")
>         } catch {
>           case e: Throwable => error("Error while accepting connection", e)   
> // line 706
>         }
>       }
>     }
>   }
> {code}
> We found during testing this would cause our Kafka clients to experience 
> errors (InvalidReplicationFactorException) for 40+ seconds when creating new 
> topics. After 40 seconds, the clients would be able to create new topics 
> successfully.
> We check that after adding the socketChannel.close() upon IOException, the 
> symptoms will disappear, so the clients do not need to wait for 40s to be 
> working again.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (KAFKA-13457) SocketChannel in Acceptor#accept is not closed upon IOException

2022-03-22 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot closed KAFKA-13457.
---

> SocketChannel in Acceptor#accept is not closed upon IOException
> ---
>
> Key: KAFKA-13457
> URL: https://issues.apache.org/jira/browse/KAFKA-13457
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.8.0
>Reporter: Haoze Wu
>Priority: Major
> Fix For: 3.2.0
>
>
> When the kafka.network.Acceptor in SocketServer.scala accepts a new 
> connection in the `accept` function, it handles the 
> `TooManyConnectionsException` and `ConnectionThrottledException`. However, 
> the socketChannel operations (line 720 or 721 or 722) within the try block 
> may potentially throw an IOException as well, which is not handled.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> // Acceptor class
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()     // line 717
>     try {
>       connectionQuotas.inc(endPoint.listenerName, 
> socketChannel.socket.getInetAddress, blockedPercentMeter)
>       socketChannel.configureBlocking(false)             // line 720
>       socketChannel.socket().setTcpNoDelay(true)         // line 721
>       socketChannel.socket().setKeepAlive(true)          // line 722
>       if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
>         socketChannel.socket().setSendBufferSize(sendBufferSize)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>       
>         info(s"Rejected connection from ${e.ip}, address already has the 
> configured maximum of ${e.count} connections.")
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException => 
>         val ip = socketChannel.socket.getInetAddress
>         debug(s"Delaying closing of connection from $ip for 
> ${e.throttleTimeMs} ms")
>         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
>         throttledSockets += DelayedCloseSocket(socketChannel, 
> endThrottleTimeMs)
>         None
>     }
>   }
> {code}
> This thrown IOException is caught in the caller `acceptNewConnections` in 
> line 706, which only prints an error message. The socketChannel that throws 
> this IOException is not closed.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
>   private def acceptNewConnections(): Unit = {
>     val ready = nioSelector.select(500)
>     if (ready > 0) {
>       val keys = nioSelector.selectedKeys()
>       val iter = keys.iterator()
>       while (iter.hasNext && isRunning) {
>         try {
>           val key = iter.next
>           iter.remove()          if (key.isAcceptable) {
>             accept(key).foreach { socketChannel => 
>                 ...
>               } while (!assignNewConnection(socketChannel, processor, 
> retriesLeft == 0))
>             }
>           } else
>             throw new IllegalStateException("Unrecognized key state for 
> acceptor thread.")
>         } catch {
>           case e: Throwable => error("Error while accepting connection", e)   
> // line 706
>         }
>       }
>     }
>   }
> {code}
> We found during testing this would cause our Kafka clients to experience 
> errors (InvalidReplicationFactorException) for 40+ seconds when creating new 
> topics. After 40 seconds, the clients would be able to create new topics 
> successfully.
> We check that after adding the socketChannel.close() upon IOException, the 
> symptoms will disappear, so the clients do not need to wait for 40s to be 
> working again.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13457) SocketChannel in Acceptor#accept is not closed upon IOException

2022-03-22 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510436#comment-17510436
 ] 

Mickael Maison commented on KAFKA-13457:


[~dajac] I see you merged https://github.com/apache/kafka/pull/11504 a while 
back. Can we now close this issue? or is there more work to do?

> SocketChannel in Acceptor#accept is not closed upon IOException
> ---
>
> Key: KAFKA-13457
> URL: https://issues.apache.org/jira/browse/KAFKA-13457
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.8.0
>Reporter: Haoze Wu
>Priority: Major
> Fix For: 3.2.0
>
>
> When the kafka.network.Acceptor in SocketServer.scala accepts a new 
> connection in the `accept` function, it handles the 
> `TooManyConnectionsException` and `ConnectionThrottledException`. However, 
> the socketChannel operations (line 720 or 721 or 722) within the try block 
> may potentially throw an IOException as well, which is not handled.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> // Acceptor class
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()     // line 717
>     try {
>       connectionQuotas.inc(endPoint.listenerName, 
> socketChannel.socket.getInetAddress, blockedPercentMeter)
>       socketChannel.configureBlocking(false)             // line 720
>       socketChannel.socket().setTcpNoDelay(true)         // line 721
>       socketChannel.socket().setKeepAlive(true)          // line 722
>       if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
>         socketChannel.socket().setSendBufferSize(sendBufferSize)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>       
>         info(s"Rejected connection from ${e.ip}, address already has the 
> configured maximum of ${e.count} connections.")
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException => 
>         val ip = socketChannel.socket.getInetAddress
>         debug(s"Delaying closing of connection from $ip for 
> ${e.throttleTimeMs} ms")
>         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
>         throttledSockets += DelayedCloseSocket(socketChannel, 
> endThrottleTimeMs)
>         None
>     }
>   }
> {code}
> This thrown IOException is caught in the caller `acceptNewConnections` in 
> line 706, which only prints an error message. The socketChannel that throws 
> this IOException is not closed.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
>   private def acceptNewConnections(): Unit = {
>     val ready = nioSelector.select(500)
>     if (ready > 0) {
>       val keys = nioSelector.selectedKeys()
>       val iter = keys.iterator()
>       while (iter.hasNext && isRunning) {
>         try {
>           val key = iter.next
>           iter.remove()          if (key.isAcceptable) {
>             accept(key).foreach { socketChannel => 
>                 ...
>               } while (!assignNewConnection(socketChannel, processor, 
> retriesLeft == 0))
>             }
>           } else
>             throw new IllegalStateException("Unrecognized key state for 
> acceptor thread.")
>         } catch {
>           case e: Throwable => error("Error while accepting connection", e)   
> // line 706
>         }
>       }
>     }
>   }
> {code}
> We found during testing this would cause our Kafka clients to experience 
> errors (InvalidReplicationFactorException) for 40+ seconds when creating new 
> topics. After 40 seconds, the clients would be able to create new topics 
> successfully.
> We check that after adding the socketChannel.close() upon IOException, the 
> symptoms will disappear, so the clients do not need to wait for 40s to be 
> working again.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on a change in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-03-22 Thread GitBox


mimaison commented on a change in pull request #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r832083031



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -169,6 +172,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs

Review comment:
   @viktorsomogyi mentioned you also looked at this issue. Apart from the 
weird "feature" described, how different is it from your fix? Is there another 
approach you'd favor or is this correctly addressing the issue?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-03-22 Thread GitBox


mimaison commented on pull request #11748:
URL: https://github.com/apache/kafka/pull/11748#issuecomment-1075079569


   @fvaleri Yes if no offsets are committed for a partition (because it can't 
be translated), it won't appear when describing consumer offsets in the target 
cluster. This is the expected behavior with this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-03-22 Thread GitBox


mimaison commented on a change in pull request #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r832079771



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -169,6 +172,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs

Review comment:
   Thanks @urbandan and @viktorsomogyi. I think I vaguely understand what 
you mean, I plan to take a look at the code again this afternoon so hopefully 
that will become clearer. 
   
   I doubt this "feature" was by design (maybe @ryannedolan can shed some 
light) as it does not seem useful or it seems pretty broken. Considering we're 
late in the release process of 3.2, I'll see if I can find a heuristic to 
filter these topics and keep the current behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11922: Update upgrade.html for 3.1.1

2022-03-22 Thread GitBox


mimaison commented on a change in pull request #11922:
URL: https://github.com/apache/kafka/pull/11922#discussion_r832075358



##
File path: docs/upgrade.html
##
@@ -19,7 +19,7 @@
 
 

[GitHub] [kafka] tombentley commented on pull request #11922: Update upgrade.html for 3.1.1

2022-03-22 Thread GitBox


tombentley commented on pull request #11922:
URL: https://github.com/apache/kafka/pull/11922#issuecomment-1075064891


   Ah, good point, now done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11899: Add s390x build stage

2022-03-22 Thread GitBox


mimaison commented on pull request #11899:
URL: https://github.com/apache/kafka/pull/11899#issuecomment-1075033923


   I opened https://github.com/apache/kafka/pull/11929


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison opened a new pull request #11929: MINOR: s390x Stage

2022-03-22 Thread GitBox


mimaison opened a new pull request #11929:
URL: https://github.com/apache/kafka/pull/11929


   Copy of https://github.com/apache/kafka/pull/11899
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11899: Add s390x build stage

2022-03-22 Thread GitBox


mimaison commented on pull request #11899:
URL: https://github.com/apache/kafka/pull/11899#issuecomment-1075030302


   Thanks @Nayana-ibm for the PR. Changes to Jenkinsfile are only applied if 
they are from a committer. I'll open another PR with your changes and see if it 
works.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-22 Thread GitBox


mimaison commented on a change in pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#discussion_r832029320



##
File path: docs/connect.html
##
@@ -74,6 +74,7 @@ Running 
Kafka Connectconfig.storage.topic (default 
connect-configs) - topic to use for storing connector and task 
configurations; note that this should be a single partition, highly replicated, 
compacted topic. You may need to manually create the topic to ensure the 
correct configuration as auto created topics may have multiple partitions or be 
automatically configured for deletion rather than compaction
 offset.storage.topic (default 
connect-offsets) - topic to use for storing offsets; this topic 
should have many partitions, be replicated, and be configured for 
compaction
 status.storage.topic (default 
connect-status) - topic to use for storing statuses; this topic 
can have multiple partitions, and should be replicated and configured for 
compaction
+plugin.path (default empty) - a list of 
paths that contain plugins (connectors, converters, transformations). For the 
purpose of quick starts users will have to add the path that contains the 
FileStreamSourceConnector and FileStreamSinkConnector packaged in 
connect-file-"version".jar, because these connectors are not 
included by default to the CLASSPATH or the 
plugin.path of the Connect worker

Review comment:
   Should we add the same to the Standalone section above?
   
   Also we should update `quickstart.html` too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10686) Pluggable standby tasks assignor for Kafka Streams

2022-03-22 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze reassigned KAFKA-10686:
-

Assignee: (was: Levani Kokhreidze)

> Pluggable standby tasks assignor for Kafka Streams
> --
>
> Key: KAFKA-10686
> URL: https://issues.apache.org/jira/browse/KAFKA-10686
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Major
>  Labels: needs-kip
>
> In production, Kafka Streams instances often run across different clusters 
> and availability zones. In order to guarantee high availability of the Kafka 
> Streams deployments, users would need more granular control over which 
> instances standby tasks can be created. 
> Idea of this ticket is to expose interface for Kafka Streams which can be 
> implemented by the users to control where standby tasks can be created.
> Kafka Streams can have RackAware assignment as a default implementation that 
> will take into account `rack.id` of the application and make sure that 
> standby tasks are created on different racks. 
> Point of this ticket though is to give more flexibility to users on standby 
> task creation, in cases where just rack awareness is not enough. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13626) NullPointerException in Selector.pollSelectionKeys: channel is null

2022-03-22 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510377#comment-17510377
 ] 

Daniel Häuser commented on KAFKA-13626:
---

Happened again today. This time I had access to the application log and the 
Kafka log. However the logs did not reveal any more related information other 
than this error stack trace. Restarting the application solved the issue. This 
time it happend with Kafka Client version 2.7.2.

> NullPointerException in Selector.pollSelectionKeys: channel is null
> ---
>
> Key: KAFKA-13626
> URL: https://issues.apache.org/jira/browse/KAFKA-13626
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.7.1
>Reporter: Daniel Häuser
>Priority: Minor
>
> This NullPointerException occured while we were having networking issues.
> Unfortunately I cannot provide much more information than this stack trace 
> because this is all I got from our operations team.
> {code:java}
> java.lang.IllegalStateException: This error handler cannot process 
> 'java.lang.NullPointerException's; no record information is available
> at 
> org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200)
> at 
> org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1599)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210)
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at java.base/java.lang.Thread.run(Thread.java:831)
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "org.apache.kafka.common.network.KafkaChannel.id()" because "channel" is null
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:516)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> at jdk.internal.reflect.GeneratedMethodAccessor128.invoke(Unknown 
> Source)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> at 
> org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
> at 
> org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
> at jdk.proxy2/jdk.proxy2.$Proxy137.poll(Unknown Source)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
> ... 3 common frames omitted {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13739) Sliding window without grace not working

2022-03-22 Thread bounkong khamphousone (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bounkong khamphousone updated KAFKA-13739:
--
Flags: Patch

> Sliding window without grace not working
> 
>
> Key: KAFKA-13739
> URL: https://issues.apache.org/jira/browse/KAFKA-13739
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: bounkong khamphousone
>Priority: Minor
>  Labels: beginner, newbie
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the 
> ability to express a SlidingWindow with no grace period but seems that it 
> doesn't work. [confluent's 
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
>  state that grace period is required and with the deprecated method, it's 
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms, 
> expected output is done. Based on the sliding window documentation, lower and 
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at 
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and 
> push the event in KStreamSlidingWindowAggregate#createWindows we call the 
> method updateWindowAndForward(l.417). This method (l.468) check that 
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} 
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>  
> For a first event with a record timestamp, we can assume that 
> observedStreamTime is equal to inputRecordTimestamp.
>  
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which 
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we 
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The 
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the 
> event to be pushed in the store and forwarded. Hence, the check would be 
> {{{}windowEnd >= closeTime{}}}.
>  
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] tiboun opened a new pull request #11928: KAFKA-13739: Sliding window with no grace period not working

2022-03-22 Thread GitBox


tiboun opened a new pull request #11928:
URL: https://github.com/apache/kafka/pull/11928


   Fix upperbound for sliding window, making it compatible with no grace period 
(kafka-13739)
   
   Added unit test for early sliding window and "normal" sliding window for 
both events within one time difference (small input) and above window time 
difference (large input).
   
   Fixing this window interval may slightly change stream behavior but 
probability to happen is extremely slow and may not have a huge impact on the 
result given.
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #11927: MINOR: Renable SocketServerTest.closingChannelWithBufferedReceives and SocketServerTest.remoteCloseWithoutBufferedReceives

2022-03-22 Thread GitBox


dajac opened a new pull request #11927:
URL: https://github.com/apache/kafka/pull/11927


   I thought that we fixed those with 
https://github.com/apache/kafka/commit/b27000ec6af6edfe8a6958dfcc3c0745667e25f4 
which was merged a few days before they were disabled. Checking here...
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11900: MINOR: Fix class comparison in `AlterConfigPolicy.RequestMetadata.equals()`

2022-03-22 Thread GitBox


dajac commented on pull request #11900:
URL: https://github.com/apache/kafka/pull/11900#issuecomment-1074896275


   Merged to trunk, 3.2 and 3.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11900: MINOR: Fix class comparison in `AlterConfigPolicy.RequestMetadata.equals()`

2022-03-22 Thread GitBox


dajac merged pull request #11900:
URL: https://github.com/apache/kafka/pull/11900


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-22 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13752.
-
Fix Version/s: 3.2.0
   3.1.1
   3.3.0
   Resolution: Fixed

> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
> Fix For: 3.2.0, 3.1.1, 3.3.0
>
>
> {code:java}
> Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
> is true in scala, but in java is false.
>  
> So this test run sccessfully. Is this the expected situation??
> {code:java}
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one 
> separately and ensure the error is thrown.
> List topics = Arrays.asList(
> new 
> MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an 
> error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic -> {
> MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic));
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData);
> assertThrows(UnsupportedVersionException.class, () -> 
> builder.build(version));
> })
> );
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-22 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510330#comment-17510330
 ] 

David Jacot commented on KAFKA-13752:
-

There is no real impact because we always use UUID.ZERO_UUID as default value. 
In this case, comparing the references works.

> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
> {code:java}
> Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
> is true in scala, but in java is false.
>  
> So this test run sccessfully. Is this the expected situation??
> {code:java}
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one 
> separately and ensure the error is thrown.
> List topics = Arrays.asList(
> new 
> MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an 
> error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic -> {
> MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic));
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData);
> assertThrows(UnsupportedVersionException.class, () -> 
> builder.build(version));
> })
> );
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac commented on pull request #11912: KAFKA-13752: Uuid compare using equals in java

2022-03-22 Thread GitBox


dajac commented on pull request #11912:
URL: https://github.com/apache/kafka/pull/11912#issuecomment-1074883379


   Merged to trunk, 3.2 and 3.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11912: KAFKA-13752: Uuid compare using equals in java

2022-03-22 Thread GitBox


dajac merged pull request #11912:
URL: https://github.com/apache/kafka/pull/11912


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2022-03-22 Thread GitBox


vamossagar12 commented on pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-1074869243


   > > the computation is done again on the cached data
   > 
   > What computation do you mean? In the end, we just iterate? And for both 
cases (also with cache) we need to have an interator for the inner store, 
because the cache might not have all data -- thus, we need to "peek" into both 
iterators to see what the next key is to "merge" both together correctly -- if 
the key exists in the underlying store and the cache, we return the data from 
the cache, but we still need to skip over the key in the underlying store to 
ensure the merging is done correctly. (Not sure if your question was referring 
to this part of the "merge logic"?)
   
   @mjsax , thanks yeah that's what I meant. Sorry I didn't check the entire 
implementation before posting. Another question on this is regarding the bug on 
Caching store. I know I need to send out another PR for this but just for my 
understanding. You mentioned if a certain key is found in both inner store and 
cache then we want to return from the cache.
   
   But if a key is expired from the inner store (what this PR addresses for 
persistent stores) but is still present in the cache, then do we keep it or 
remove it from the cache? And do we return that key? The reason I ask this is I 
think the cached store is maintained by a different config - cache.max_bytes 
and the expiry of items from the cache is decided by that config. So, there may 
be chances when the innerstore has a certain key expired but due to this 
config's threshold not being met, the key isn't removed from the cache. To keep 
them in sync, we can remove the item from the cache.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-03-22 Thread GitBox


showuon commented on pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#issuecomment-1074819113


   @rondagostino , yes, we are aware of it, and that's why we have 
https://github.com/apache/kafka/pull/11788 to disable idempotent producer when 
there are config conflicts. Could you please run again with the build including 
https://github.com/apache/kafka/pull/11788 change? Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #11923: KAFKA-6718 / Documentation

2022-03-22 Thread GitBox


lkokhreidze commented on pull request #11923:
URL: https://github.com/apache/kafka/pull/11923#issuecomment-1074801863


   Thanks @showuon 
   I've addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #11923: KAFKA-6718 / Documentation

2022-03-22 Thread GitBox


lkokhreidze commented on a change in pull request #11923:
URL: https://github.com/apache/kafka/pull/11923#discussion_r831823225



##
File path: docs/streams/architecture.html
##
@@ -161,6 +161,12 @@ 

[GitHub] [kafka] LiamClarkeNZ commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


LiamClarkeNZ commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1074795068


   Cheers Luke, will do :)
   
   On Tue, 22 Mar 2022 at 19:43, Luke Chen ***@***.***> wrote:
   
   > Thanks @LiamClarkeNZ  !
   >
   > The wording overall LGTM! Please submit a PR when available. We can
   > comment on the wording there.
   >
   > Actually, do we need to add similar documentation to the upgrade
   > documentation for 3.0.1, 3.1.1 also?
   >
   > Yes, please add them to 3.0.1 and 3.1.1, too.
   > Thanks.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


showuon commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1074794074


   Thanks @LiamClarkeNZ !
   
   The wording overall LGTM! Please submit a PR when available. We can comment 
on the wording there.
   
   > Actually, do we need to add similar documentation to the upgrade
   documentation for 3.0.1, 3.1.1 also?
   
   Yes, please add them to 3.0.1 and 3.1.1, too. 
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LiamClarkeNZ commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


LiamClarkeNZ commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1074783688


   Hi Luke, Ismael et al,
   
   No worries on adding a comment to that effect in the upgrade docs - but I'm
   not sure how to word it, as the idempotent producer was never explicitly
   disabled by default in the Kafka Connect Worker. I realise, based on Luke's
   recent work to fix KAFKA-13598, that the intended enabling of idempotence
   by default with Kafka 3.0 didn't happen either.
   
   So while I just removed the setting of max inflight requests to 1, allowing
   the default of 5 to be used, . (I also removed the explicit setting of
   acks=all for the same reason), I guess this will be the first version where
   Kafka Connect uses idempotent producers by default. So I was thinking of
   wording it like this?
   
   
   *Kafka Connect now uses [idempotent producers](http://link.to.docs.here
   ) by default, and now defaults to a maximum of 5
   inflight requests (five is the uppermost limit supported by idempotent
   producers). You can override producer settings controlling this behaviour
   using the properties producer.enable.idempotence and
   producer.max.inflight.requests*
   Actually, do we need to add similar documentation to the upgrade
   documentation for 3.0.1, 3.1.1 also? As KC workers based on those version
   will now be defaulting to idempotent enabled. The workers in those versions
   will still have max.inflight.requests set to 1, but it could be set to 5 by
   producer.* overrides, I could add commentary to that regard.
   
   Thanks,
   
   Liam
   
   On Tue, 22 Mar 2022 at 19:07, Luke Chen ***@***.***> wrote:
   
   > @kkonstantine  , thanks for the comment.
   > I agree we add comments in the code to say the idempotent producer is
   > enabled by default.
   > cc @LiamClarkeNZ 
   >
   > Thank you.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-22 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510244#comment-17510244
 ] 

RivenSun commented on KAFKA-13689:
--

Hi [~guozhang] 
1. We all agree that users can get all their custom configurations in their own 
plugin modules by passing the `configs` variable through the 
`ChannelBuilders#channelBuilderConfigs` method.
In fact, the initialization methods of all KafKaClient (such as KafkaConsumer) 
that contain AbstractConfig parameters {*}are not public{*}, so users cannot 
directly access the corresponding type of AbstractConfig, and thus will not 
directly call the methods in AbstractConfig.
2. The `configs` variable mentioned above is actually of type 
AbstractConfig.RecordingMap. If users call configs.get(key) to retrieve their 
own configuration, the RecordingMap.get() method will eventually call the 
*AbstractConfig.ignore(key)* method, so `used` will contain user-defined 
unknown configuration.

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set unused() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(used);
> return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
> if (!values.containsKey(key))
> throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
> used.add(key);
> return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig#logUnused() method
> Modify the log printing information of this method,and the unused 
> configuration log print level can be changed to {*}INFO{*}, what do you think?
> {code:java}
> /**
>  * Log infos for any unused configurations
>  */
> public void logUnused() {     for (String key : unused())
> log.info("The configuration '{}' was supplied but isn't a used 
> config.", key);
> }{code}
>  
>  
> 2. AbstractConfig provides two new methods: logUnknown() and unknown()
> {code:java}
> /**
>  * Log warnings for any unknown configurations
>  */
> public void logUnknown() {
> for (String key : unknown())
> log.warn("The configuration '{}' was supplied but isn't a known 
> config.", key);
> } {code}
>  
> {code:java}
> public Set unknown() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(values.keySet());
> return keys;
> } {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11475: KAFKA-7077: Use default producer settings in Connect Worker

2022-03-22 Thread GitBox


showuon commented on pull request #11475:
URL: https://github.com/apache/kafka/pull/11475#issuecomment-1074774576


   @kkonstantine , thanks for the comment. I agree we add comments in the code 
to say the idempotent producer is enabled by default.
   cc @LiamClarkeNZ 
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org