[GitHub] [kafka] mjsax merged pull request #13175: KAFAK-14660: Fix divide-by-zero vulnerability

2023-01-30 Thread via GitHub


mjsax merged PR #13175:
URL: https://github.com/apache/kafka/pull/13175


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14665) my custom SMT that converts Int to String does not work for primary keys

2023-01-30 Thread Fuxin Hao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fuxin Hao updated KAFKA-14665:
--
Description: 
I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and 
{{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two 
PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in 
[Debezium 
config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types].
 which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and 
it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to 
transform these data from numeric types to strings.

 

Say I have the following table:
{code:java}
CREATE TABLE pk_created_at (
created_at timestamp without time zone DEFAULT current_timestamp not null,
PRIMARY KEY (created_at)
);
insert into pk_created_at values(current_timestamp); {code}
 

My source connector configuration:
{code:java}
{
"name": "test-connector",
"config": {
"snapshot.mode": "always",
"plugin.name": "pgoutput",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "source",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "test",
"slot.name" : "test",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enabled": true,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enabled": true,
"decimal.handling.mode": "string",
"time.precision.mode": "adaptive",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
} {code}
 

And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be:
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"int64", "optional":false, 
"name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } 
], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":1669354751764130 } }{code}
 

After applying my SMT, the messages would be like this:
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"string", "optional":true, "field":"created_at" } ], 
"optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":"2022-11-25T05:39:11.764130Z" } }{code}
{{ }}

It worke[d great if 
|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}}
 is not a part of primary keys. No error occurred. But the primary keys on some 
of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: 
`{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in 
`{{{}JdbcSinkConnector`{}}} as below:
{code:java}
2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql 
[io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 
06:57:01,459 INFO || Maximum table name length for database is 63 bytes 
[io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 
06:57:01,459 INFO || JdbcDbWriter Connected 
[io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || 
Checking PostgreSql dialect for existence of TABLE "pk_created_at" 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE 
"pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 
2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to 
Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', 
isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} 
[io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN 
|| Write of 2 records failed, remainingRetries=0 
[io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: 
Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES 
(1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: 
column "created_at" is of type timestamp without time zone but expression is of 
type bigint Hint: You will need to rewrite or cast the expression. Position: 52 
Call getNextException to see other errors in the 

[jira] [Assigned] (KAFKA-14650) IQv2 can throw ConcurrentModificationException when accessing Tasks

2023-01-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14650:
---

Assignee: Guozhang Wang

> IQv2 can throw ConcurrentModificationException when accessing Tasks 
> 
>
> Key: KAFKA-14650
> URL: https://issues.apache.org/jira/browse/KAFKA-14650
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: Guozhang Wang
>Priority: Major
>
> From failure in *[PositionRestartIntegrationTest.verifyStore[cache=false, 
> log=true, supplier=IN_MEMORY_WINDOW, 
> kind=PAPI]|https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/63/testReport/junit/org.apache.kafka.streams.integration/PositionRestartIntegrationTest/Build___JDK_11_and_Scala_2_13___verifyStore_cache_false__log_true__supplier_IN_MEMORY_WINDOW__kind_PAPI_/]*
> java.util.ConcurrentModificationException
>   at 
> java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
>   at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
>   at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
>   at java.base/java.util.HashMap.putMapEntries(HashMap.java:508)
>   at java.base/java.util.HashMap.putAll(HashMap.java:781)
>   at 
> org.apache.kafka.streams.processor.internals.Tasks.allTasksPerId(Tasks.java:361)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1537)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.allTasks(StreamThread.java:1278)
>   at org.apache.kafka.streams.KafkaStreams.query(KafkaStreams.java:1921)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.iqv2WaitForResult(IntegrationTestUtils.java:168)
>   at 
> org.apache.kafka.streams.integration.PositionRestartIntegrationTest.shouldReachExpectedPosition(PositionRestartIntegrationTest.java:438)
>   at 
> org.apache.kafka.streams.integration.PositionRestartIntegrationTest.verifyStore(PositionRestartIntegrationTest.java:423)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14665) my custom SMT that converts Int to String does not work for primary keys

2023-01-30 Thread Fuxin Hao (Jira)
Fuxin Hao created KAFKA-14665:
-

 Summary: my custom SMT that converts Int to String does not work 
for primary keys
 Key: KAFKA-14665
 URL: https://issues.apache.org/jira/browse/KAFKA-14665
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.1.0
Reporter: Fuxin Hao


I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and 
{{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two 
PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in 
[Debezium 
config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types].
 which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and 
it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to 
transform these data from numeric types to strings.

 

Say I have the following table:
{code:java}
CREATE TABLE pk_created_at (
created_at timestamp without time zone DEFAULT current_timestamp not null,
PRIMARY KEY (created_at)
);
insert into pk_created_at values(current_timestamp); {code}
{{}}

{{}}

My source connector configuration:
{code:java}
{
"name": "test-connector",
"config": {
"snapshot.mode": "always",
"plugin.name": "pgoutput",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "source",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "test",
"slot.name" : "test",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enabled": true,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enabled": true,
"decimal.handling.mode": "string",
"time.precision.mode": "adaptive",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
} {code}
{{}}

{{}}

 

And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be:

{{}}
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"int64", "optional":false, 
"name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } 
], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":1669354751764130 } }{code}
 

{{}}

After applying my SMT, the messages would be like this:

{{}}
{code:java}
# bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic 
test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", 
"fields":[ { "type":"string", "optional":true, "field":"created_at" } ], 
"optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ 
"created_at":"2022-11-25T05:39:11.764130Z" } }{code}
{{ }}

It worke[d great if 
|https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}}
 is not a part of primary keys. No error occurred. But the primary keys on some 
of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: 
`{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in 
`{{{}JdbcSinkConnector`{}}} as below:

{{}}
{code:java}
2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql 
[io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 
06:57:01,459 INFO || Maximum table name length for database is 63 bytes 
[io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 
06:57:01,459 INFO || JdbcDbWriter Connected 
[io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || 
Checking PostgreSql dialect for existence of TABLE "pk_created_at" 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present 
[io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 
06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE 
"pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 
2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to 
Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', 
isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} 
[io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN 
|| Write of 2 records failed, remainingRetries=0 
[io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: 
Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES 
(1669359291990398) ON CONFLICT 

[jira] [Assigned] (KAFKA-14487) Move LogManager to storage module

2023-01-30 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14487:
-

Assignee: Sagar Rao

> Move LogManager to storage module
> -
>
> Key: KAFKA-14487
> URL: https://issues.apache.org/jira/browse/KAFKA-14487
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Sagar Rao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14585) Move StorageTool to tools

2023-01-30 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14585:
-

Assignee: Sagar Rao

> Move StorageTool to tools
> -
>
> Key: KAFKA-14585
> URL: https://issues.apache.org/jira/browse/KAFKA-14585
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Sagar Rao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-30 Thread via GitHub


Hangleton commented on PR #13169:
URL: https://github.com/apache/kafka/pull/13169#issuecomment-1409850803

   Taking a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13159: KAFKA-14656 Send UMR first during ZK migration

2023-01-30 Thread via GitHub


cmccabe merged PR #13159:
URL: https://github.com/apache/kafka/pull/13159


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682401#comment-17682401
 ] 

Matthias J. Sax commented on KAFKA-14646:
-

I was thinking about this issue, and I think the only way to upgrade is to 
"drain" your topology. Ie, you would need to stop your upstream producers and 
not send any new input data. Afterwards, you let KS finish processing of all 
input data (including processing of all data from internal topics, ie, 
repartition, fk-subscription, and fk-response topics), to really "drain" the 
topology completely. Next, do a two round rolling bounce using `upgrade.from`, 
and finally resume your upstream producers.

Would you be willing to try this out (and report back)?

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0, 3.3.1, 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Nayana-ibm commented on pull request #11929: MINOR: s390x Stage

2023-01-30 Thread via GitHub


Nayana-ibm commented on PR #11929:
URL: https://github.com/apache/kafka/pull/11929#issuecomment-1409780907

   Any update on this please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14664) Raft idle ratio is inaccurate

2023-01-30 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14664:
---

 Summary: Raft idle ratio is inaccurate
 Key: KAFKA-14664
 URL: https://issues.apache.org/jira/browse/KAFKA-14664
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


The `poll-idle-ratio-avg` metric is intended to track how idle the raft IO 
thread is. When completely idle, it should measure 1. When saturated, it should 
measure 0. The problem with the current measurements is that they are treated 
equally with respect to time. For example, say we poll twice with the following 
durations:

Poll 1: 2s

Poll 2: 0s

Assume that the busy time is negligible, so 2s passes overall.

In the first measurement, 2s is spent waiting, so we compute and record a ratio 
of 1.0. In the second measurement, no time passes, and we record 0.0. The idle 
ratio is then computed as the average of these two values (1.0 + 0.0 / 2 = 
0.5), which suggests that the process was busy for 1s. 

Instead, we should sum up the time waiting over the full interval. 2s passes 
total here and 2s is idle, so we should compute 1.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-30 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +480,34 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+if (object instanceof AutoCloseable) {
+try {
+((AutoCloseable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}
+} else if (object instanceof Closeable) {
+try {
+((Closeable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, I really did not want to change 
getConfiguredInstances, but  it seemed the only way to address this was to 
change it very carefully.   However,  I believe I've implemented each of your 
comments and thanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-30 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +480,34 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+if (object instanceof AutoCloseable) {
+try {
+((AutoCloseable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}
+} else if (object instanceof Closeable) {
+try {
+((Closeable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, I really did not want to change 
getConfiguredInstances, but  it seemed the only way to address this was to 
change it very carefully.   However,  I believe I've implemented each of your 
comments and athanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-30 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +480,34 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+if (object instanceof AutoCloseable) {
+try {
+((AutoCloseable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}
+} else if (object instanceof Closeable) {
+try {
+((Closeable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, I really wanted to avoid changing 
getConfiguredInstances, but  it seemed the only way to address this was to 
change it.   However,  I believe I've implemented each of your comments and 
athanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-30 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +480,34 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+if (object instanceof AutoCloseable) {
+try {
+((AutoCloseable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}
+} else if (object instanceof Closeable) {
+try {
+((Closeable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, it seemed the only way to address 
this was to change the getConfiguredInstances.   But,  I believe I've 
implemented each of your comments and thanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14663) High throughput topics can starve low-throughput MM2 offset syncs

2023-01-30 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14663:
---

 Summary: High throughput topics can starve low-throughput MM2 
offset syncs
 Key: KAFKA-14663
 URL: https://issues.apache.org/jira/browse/KAFKA-14663
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.3.0, 3.0.0, 3.1.0, 3.4.0, 3.5.0
Reporter: Greg Harris
Assignee: Greg Harris


In MM2, a semaphore is used to throttle the number of offset syncs written to 
the offset-syncs topic. If too many offset writes are requested (for example, 
from high-throughput topics) then some are silently dropped and never retried. 
This is acceptable for a single topic-partition, where a later record may 
re-trigger the offset-sync and write the sync successfully.

However, if there is a large variance between throughput in the topics emitted 
by an MM2 instance, it is possible for high-throughput topics to trigger many 
offset syncs, and cause the offset-syncs for a co-located low-throughput topic 
to be unfairly dropped.

This can cause the downstream offsets for the starved topic to lag behind 
significantly, or be prevented completely.

Instead, we should have some sort of fairness mechanism where low-thoughput 
topics are given similar priority to high-throughput topics in producing offset 
syncs, and cause excess sync messages from high-throughput topics to be dropped 
instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-01-30 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1091311692


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -322,15 +325,37 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
 }
 // Finally, the idToName map contains all the topics that we are 
authorized to delete.
-// Perform the deletion and create responses for each one.
-controller.deleteTopics(context, idToName.keySet).thenApply { 
idToError =>
-  idToError.forEach { (id, error) =>
-appendResponse(idToName.get(id), id, error)
+// First check the controller mutation quota if necessary, and then

Review Comment:
   This is a lot more logic (and expense) than what I wanted, and unfortunately 
it also has a TOCTOU, because partitions may be added in between checking here 
and doing the deletion.
   
   I guess I simply didn't forsee this, sorry.
   
   I wonder if we could convert the throttle to be lockless so that we could 
use it in the QuorumController thread itself. Like I said earlier, the big 
concern is that blocking the controller thread is really bad and not something 
we want to do under really any condition. It seems like the throttle should be 
simple... just a number or something right? Or can we somehow guarantee that 
only the controller thread itself is taking that lock under ordinary conditions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] anatasiavela commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-30 Thread via GitHub


anatasiavela commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1409535758

   @divijvaidya Since this PR is strictly focusing on the metric, I think it's 
reasonable for that change to be done in a separate PR. what do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-01-30 Thread Andy Coates (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682304#comment-17682304
 ] 

Andy Coates commented on KAFKA-14660:
-

As per description, I think the best 'fix' might be to re-open the PR and merge 
it, as it looks to me that the vulnerability report it tied to the PR.  

If you know how I can tag a new PR to resolve the vulnerability, then I'm all 
ears.

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-30 Thread via GitHub


cmccabe commented on PR #13169:
URL: https://github.com/apache/kafka/pull/13169#issuecomment-1409506162

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #13176: MINOR: some ZK migration code cleanups.

2023-01-30 Thread via GitHub


cmccabe opened a new pull request, #13176:
URL: https://github.com/apache/kafka/pull/13176

   Some minor improvements to the JavaDoc for ZkMigrationState.
   
   Rename MigrationState to MigrationDriverState to avoid confusion with 
ZkMigrationState.
   
   Remove ClusterImage#zkBrokers. This costs O(num_brokers) time to calculate, 
but is only ever used when in migration state. It should just be calculated in 
the migration code. (Additionally, the function ClusterImage.zkBrokers() 
returns something other than ClusterImage#zkBrokers, which is confusing.)
   
   Also remove ClusterDelta#liveZkBrokerIdChanges. This is only used in one 
place, and it's easy to calculate it there. In general we should avoid 
providing expensive accessors unless absolutely necessary. Expensive code 
should look expensive: if people want to iterate over all brokers, they can 
write a loop to do that rather than hiding it inside an accessor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-30 Thread via GitHub


cmccabe commented on PR #13169:
URL: https://github.com/apache/kafka/pull/13169#issuecomment-1409444203

   Looks like Jenkins is having issues.
   ```
   [2023-01-30T20:15:48.665Z] + ./retry_zinc ./gradlew -PscalaVersion=2.13 
clean compileJava compileScala compileTestJava compileTestScala 
spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat --profile 
--continue -PxmlSpotBugsReport=true -PkeepAliveMode=session
   
   [2023-01-30T20:15:49.622Z] To honour the JVM settings for this build a 
single-use Daemon process will be forked. See 
https://docs.gradle.org/7.6/userguide/gradle_daemon.html#sec:disabling_the_daemon.
   
   [2023-01-30T20:15:51.411Z] Daemon will be stopped at the end of the build 
   
   [2023-01-30T20:16:59.582Z] 
   
   [2023-01-30T20:16:59.582Z] FAILURE: Build failed with an exception.
   
   [2023-01-30T20:16:59.582Z] 
   
   [2023-01-30T20:16:59.582Z] * What went wrong:
   
   [2023-01-30T20:16:59.582Z] Gradle could not start your build.
   
   [2023-01-30T20:16:59.582Z] > Cannot create service of type 
BuildSessionActionExecutor using method 
LauncherServices$ToolingBuildSessionScopeServices.createActionExecutor() as 
there is a problem with parameter #21 of type FileSystemWatchingInformation.
   
   [2023-01-30T20:16:59.582Z]> Cannot create service of type 
BuildLifecycleAwareVirtualFileSystem using method 
VirtualFileSystemServices$GradleUserHomeServices.createVirtualFileSystem() as 
there is a problem with parameter #7 of type GlobalCacheLocations.
   
   [2023-01-30T20:16:59.582Z]   > Cannot create service of type 
GlobalCacheLocations using method 
GradleUserHomeScopeServices.createGlobalCacheLocations() as there is a problem 
with parameter #1 of type List.
   
   [2023-01-30T20:16:59.582Z]  > Could not create service of type 
FileAccessTimeJournal using 
GradleUserHomeScopeServices.createFileAccessTimeJournal().
   
   [2023-01-30T20:16:59.582Z] > Timeout waiting to lock journal 
cache (/home/jenkins/.gradle/caches/journal-1). It is currently in use by 
another Gradle instance.
   
   [2023-01-30T20:16:59.582Z]   Owner PID: 13197
   
   [2023-01-30T20:16:59.582Z]   Our PID: 16325
   
   [2023-01-30T20:16:59.582Z]   Owner Operation: 
   
   [2023-01-30T20:16:59.582Z]   Our operation: 
   
   [2023-01-30T20:16:59.582Z]   Lock file: 
/home/jenkins/.gradle/caches/journal-1/journal-1.lock
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13161: Kafka 14128

2023-01-30 Thread via GitHub


mjsax commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1091223587


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -538,6 +544,8 @@ protected Map getNumPartitions(final 
Set topics,
 tempUnknownTopics.add(topicName);
 log.debug("The leader of topic {} is not available.\n" +
 "Error message was: {}", topicName, cause.toString());
+} else if (cause instanceof TimeoutException) {
+throw new RuntimeException();

Review Comment:
   We should not throw an exception here, but do what we do inside the existing 
`catch TimeoutException` block (that we can remove).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13161: Kafka 14128

2023-01-30 Thread via GitHub


mjsax commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1091222822


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -466,7 +469,10 @@ public Set makeReady(final Map topics) {
 topicName)
 );
 }
-} else {
+} else if (cause instanceof  TimeoutException) 
{

Review Comment:
   I actually believe, we can remove the `catch TimeoutException` block below, 
because `get()` should never throw a timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-01-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682291#comment-17682291
 ] 

Matthias J. Sax commented on KAFKA-14660:
-

Hey Andy – thanks for the ticket. We would have accepted a PR, too. :D

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-30 Thread via GitHub


C0urante commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091084063


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +480,34 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+if (object instanceof AutoCloseable) {
+try {
+((AutoCloseable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}
+} else if (object instanceof Closeable) {
+try {
+((Closeable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}

Review Comment:
   This can be simplified a bit:
   
   ```suggestion
   Utils.closeQuietly((AutoCloseable) object, 
"AutoCloseable object constructed and configured during failed call to 
getConfiguredInstances");
   
   ```
   
   1. `Utils::closeQuietly` handles failures for us
   2. `Closeable` is a subinterface of `AutoCloseable`, so we only need to 
check for the latter



##
clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java:
##
@@ -55,6 +58,11 @@ public void configure(Map configs) {
 Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
 if (clientIdValue == null)
 throw new ConfigException("Mock consumer interceptor expects 
configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+CONFIG_COUNT.incrementAndGet();
+if (CONFIG_COUNT.get() == THROW_CONFIG_EXCEPTION_THRESHOLD.get()) {
+throw new ConfigException("Kafka producer creation failed. Failure 
may not have cleaned up listener thread resource.");

Review Comment:
   It seems like the failure message here is hinting that we try to create a 
Kafka producer in this interceptor, but there isn't much else in the class to 
go along with that.
   
   Could we use a more generic message like "Failed to instantiate interceptor 
(reached throw-on-config threshold)"?



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -503,6 +505,30 @@ public void testInterceptorConstructorClose() {
 }
 }
 
+@Test
+public void 
testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances()
 {

Review Comment:
   I like this test. Could we also get a matching one for producer 
interceptors, and something analogous for the `AbstractConfig` class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-01-30 Thread Andy Coates (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682265#comment-17682265
 ] 

Andy Coates commented on KAFKA-14660:
-

Hey Matthias ;)

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-30 Thread via GitHub


cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1091087602


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs
+) throws TimeoutException, InterruptedException, ExecutionException  {
+TimeoutException timeoutException = null;
+while (true) {
+long nowNs = nanoseconds();
+if (deadlineNs <= nowNs) {
+throw (timeoutException == null) ? new TimeoutException() : 
timeoutException;
+}
+long deltaNs = deadlineNs - nowNs;
+try {
+return future.get(deltaNs, TimeUnit.NANOSECONDS);

Review Comment:
   Thanks for reviewing. I should have been clearer that this wasn't the main 
point of the PR :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-30 Thread via GitHub


Hangleton commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1091084191


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs
+) throws TimeoutException, InterruptedException, ExecutionException  {
+TimeoutException timeoutException = null;
+while (true) {
+long nowNs = nanoseconds();
+if (deadlineNs <= nowNs) {
+throw (timeoutException == null) ? new TimeoutException() : 
timeoutException;
+}
+long deltaNs = deadlineNs - nowNs;
+try {
+return future.get(deltaNs, TimeUnit.NANOSECONDS);

Review Comment:
   Got it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vladimirdyuzhev commented on a diff in pull request #13081: Re-using callbackHandler for refreshing Kerberos TGT when keytab is not used

2023-01-30 Thread via GitHub


vladimirdyuzhev commented on code in PR #13081:
URL: https://github.com/apache/kafka/pull/13081#discussion_r1091084183


##
clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java:
##
@@ -90,6 +91,7 @@ public void configure(Map configs, String 
contextName, Configuration
 this.minTimeBeforeRelogin = (Long) 
configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
 this.kinitCmd = (String) 
configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD);
 this.serviceName = getServiceName(configs, contextName, configuration);
+this.callbackHandler = callbackHandler;

Review Comment:
   I have moved the loginHandler to AbstractLogin and added a test.
   
   I've sent an email to priv...@kafka.apache.org to create a JIRA account for 
me to open a security issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-01-30 Thread via GitHub


mjsax commented on PR #12654:
URL: https://github.com/apache/kafka/pull/12654#issuecomment-1409251099

   Why has this PR 392 commits?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-01-30 Thread via GitHub


mjsax commented on code in PR #12654:
URL: https://github.com/apache/kafka/pull/12654#discussion_r1091058442


##
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##
@@ -37,6 +37,9 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   not: missing `` between the paragraphs. -- Seems they are also missing on 
other places in this file -- can we fix all of them?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -988,6 +988,18 @@ public void unregister(final Collection 
revokedChangelogs) {
 if (changelogMetadata != null) {
 if 
(!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
 revokedInitializedChangelogs.add(partition);
+
+// if the changelog is still in REGISTERED, it means it 
has not initialized and started

Review Comment:
   > if the changelog is still in REGISTERED
   
   We check `!changelogMetadata.state().equals(ChangelogState.REGISTERED)` 
above, so it seem we _are_ not in REGISTRED state here. Should the comment go 
somewhere else, or not use `if...` but say: `because...` ?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -988,6 +988,18 @@ public void unregister(final Collection 
revokedChangelogs) {
 if (changelogMetadata != null) {
 if 
(!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
 revokedInitializedChangelogs.add(partition);
+
+// if the changelog is still in REGISTERED, it means it 
has not initialized and started
+// restoring yet, and hence the corresponding 
onRestoreStart was not called; in this case
+// we should not call onRestorePaused either

Review Comment:
   nit `onRestorePaused` -> `onRestoreSuspended`



##
streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java:
##
@@ -37,6 +37,7 @@ public class MockStateRestoreListener implements 
StateRestoreListener {
 public static final String RESTORE_START = "restore_start";
 public static final String RESTORE_BATCH = "restore_batch";
 public static final String RESTORE_END = "restore_end";
+public static final String RESTORE_PAUSED = "restore_paused";

Review Comment:
   nit: rename -> `RESTORE_SUSPENDED`



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -197,6 +198,49 @@ public void shouldNotRegisterStoreWithoutMetadata() {
 () -> changelogReader.register(new 
TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager));
 }
 
+@Test
+public void shouldSupportUnregisterChangelogBeforeCompletion() {

Review Comment:
   We should only call this is we are in `RESTORING` state, right? If we are 
`COMPLETED`, we should not call `suspend` because we did call `end` already? So 
the test name seems missleading?



##
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##
@@ -37,6 +37,9 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   > standby tasks restoration process
   
   Is "restoration" the best wording?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -197,6 +198,49 @@ public void shouldNotRegisterStoreWithoutMetadata() {
 () -> changelogReader.register(new 
TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager));
 }
 
+@Test
+public void shouldSupportUnregisterChangelogBeforeCompletion() {

Review Comment:
   Should we add a case for a "completed" partition, and that we don't call 
`suspended` during `unregister` for this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13163: KAFKA-14653: MirrorMakerConfig using raw properties instead of post-r…

2023-01-30 Thread via GitHub


C0urante commented on code in PR #13163:
URL: https://github.com/apache/kafka/pull/13163#discussion_r1091016715


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -246,13 +246,22 @@ public Map originals(Map 
configOverrides) {
  */
 public Map originalsStrings() {
 Map copy = new RecordingMap<>();
+copyAsStrings(originals, copy);
+return copy;
+}
+
+/**
+ * Ensures that all values of a map are strings, and copies them to 
another map.
+ * @param originals The map to validate.
+ * @param copy The target to copy to.
+ */
+protected static void copyAsStrings(Map originals, Map copy) {

Review Comment:
   This counts as a change to public interface since subclasses of 
`AbstractConfig` would be able to access this new method. And since this wasn't 
mentioned in the KIP, we probably shouldn't do that here.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java:
##
@@ -82,10 +82,16 @@ public class MirrorMakerConfig extends AbstractConfig {
 static final String TARGET_PREFIX = "target.";
 
 private final Plugins plugins;
-   
+
+private final Map rawProperties;
+
+@SuppressWarnings("unchecked")
 public MirrorMakerConfig(Map props) {
 super(CONFIG_DEF, props, true);
 plugins = new Plugins(originalsStrings());
+
+rawProperties = new HashMap<>();
+copyAsStrings((Map) props, rawProperties);

Review Comment:
   It looks like this is intended to get around the fact that the constructor 
accepts a `Map` but we really want to work with a `Map`. 
Could we change the constructor to accept a `Map` and store 
that as our `rawProperties` field, instead of introducing the `copyAsStrings` 
method?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java:
##
@@ -130,15 +136,14 @@ public List clusterPairs() {
   */
 public MirrorClientConfig clientConfig(String cluster) {
 Map props = new HashMap<>();
-props.putAll(originalsStrings());
+props.putAll(rawProperties);

Review Comment:
   Why change this part? Aren't we transforming the configs later at line 141 
anyways?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-30 Thread via GitHub


cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1091061226


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs
+) throws TimeoutException, InterruptedException, ExecutionException  {
+TimeoutException timeoutException = null;
+while (true) {
+long nowNs = nanoseconds();
+if (deadlineNs <= nowNs) {
+throw (timeoutException == null) ? new TimeoutException() : 
timeoutException;
+}
+long deltaNs = deadlineNs - nowNs;
+try {
+return future.get(deltaNs, TimeUnit.NANOSECONDS);

Review Comment:
   The JavaDoc for CompletableFuture doesn't say what it does in case of 
negative timeouts. Therefore, it's a bad idea to rely on this behavior. Also, 
the subtraction you propose may overflow.
   
   In any case this code was part of a different PR, #13153. I only included it 
here because I needed something from FutureUtils. Sorry for the confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13161: Kafka 14128

2023-01-30 Thread via GitHub


cmccabe commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1091058054


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -521,7 +524,7 @@ protected Map getNumPartitions(final 
Set topics,
 for (final Map.Entry> 
topicFuture : futures.entrySet()) {
 final String topicName = topicFuture.getKey();
 try {
-final TopicDescription topicDescription = 
topicFuture.getValue().get();
+final TopicDescription topicDescription = 
topicFuture.getValue().get(Long.parseLong(DEFAULT_API_TIMEOUT_MS_CONFIG), 
TimeUnit.MILLISECONDS);

Review Comment:
   Do not put timeout parameters on Future.get. This will not abort the 
operation. If you want timeouts, they should be done by adjusting your admin 
client configuration parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13153: MINOR: startup timeouts for KRaft integration tests

2023-01-30 Thread via GitHub


cmccabe merged PR #13153:
URL: https://github.com/apache/kafka/pull/13153


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-30 Thread via GitHub


ijuma commented on PR #13131:
URL: https://github.com/apache/kafka/pull/13131#issuecomment-1409209835

   Got it, so it's useful for `main` methods too - not just CLI tools. Then 
it's fine for it to be in `server-common`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-30 Thread via GitHub


Hangleton commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1091027869


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   Got it. Thanks. 
   
   Would you have performance gains at a high-level, without sharing details on 
the application?
   
   Publicly available data showing the expectable gains would back up the PR 
further. Has this been discussed in the dev mailing list already?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13161: Kafka 14128

2023-01-30 Thread via GitHub


Hangleton commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1090718859


##
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java:
##
@@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable 
cause) {
  * Waits if necessary for this future to complete, and then returns its 
result.
  */
 @Override
-public T get() throws InterruptedException, ExecutionException {
+public abstract T get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   Thanks for the follow-up. 
   
   Trying to understand your use case. 
   
   The `MetadataRequest` used to describe topics timed out. All retries were 
exhausted (4 lines) and a `TimeoutException`  (FQN 
`org.apache.kafka.common.errors.TimeoutException`) was propagated to the future 
(that is, the future was completed exceptionally), and then propagated to the 
caller resulting in the behaviour observed.
   
   ```
   2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
org.apache.kafka.clients.NetworkClient   : [AdminClient 
clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting from 
node 3 due to request timeout.
   2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
org.apache.kafka.clients.NetworkClient   : [AdminClient 
clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled 
in-flight METADATA request with correlation id 985 due to node 3 being 
disconnected (elapsed time since creation: 60023ms, elapsed time since send: 
60023ms, request timeout: 3ms)
   2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] 
o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected 
error during topic description for 
L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog.
   Error message was: org.apache.kafka.common.errors.TimeoutException: 
Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, 
nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
   2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread : stream-thread 
[L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State transition 
from RUNNING to PENDING_SHUTDOWN
   ```
   
   Therefore, it seems that:
   
   - The failure of the "describe-topics" invocation was already correctly 
propagated via the future.
   - This failure results of the time out of the underlying Metadata request 
**and** exhaustion of retries.
   
   If the broker(s) were only temporarily unavailable, it seems increasing the 
number of retries may have helped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-30 Thread via GitHub


fvaleri commented on PR #13131:
URL: https://github.com/apache/kafka/pull/13131#issuecomment-1409100823

   > Thanks for doing this. One thing I didn't understand is why the shared 
classes are in `server-common` instead of `tools`. Is this because `core` also 
uses the class?
   
   Good question. CommandLineUtils is mostly used by tools, but Kafka.scala has 
a dependency on it. In order to move these 2 shared classes to the tools 
module, we would need to add tools dependency to core. Is this correct?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14661) Upgrade Zookeeper to 3.8.1

2023-01-30 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682204#comment-17682204
 ] 

Ismael Juma edited comment on KAFKA-14661 at 1/30/23 5:39 PM:
--

We should focus on getting this done for Apache Kafka 3.5.0. The patch release 
discussion should happen after 3.5.0 has happened and we have strong evidence 
that it's safe to do it as part of patch releases.


was (Author: ijuma):
We should focus on getting this done for 3.5.0. The patch release discussion 
should happen after 3.5.0 has happened and we have strong evidence that it's 
safe to do it as part of patch releases.

> Upgrade Zookeeper to 3.8.1 
> ---
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #13159: KAFKA-14656 Send UMR first during ZK migration

2023-01-30 Thread via GitHub


mumrah commented on code in PR #13159:
URL: https://github.com/apache/kafka/pull/13159#discussion_r1090944813


##
core/src/main/scala/kafka/migration/MigrationPropagator.scala:
##
@@ -118,9 +124,8 @@ class MigrationPropagator(
   }
 }
 
-// If there are new brokers (including KRaft brokers) or if there are 
changes in topic
-// metadata, let's send UMR about the changes to the old Zk brokers.
-if (brokersChanged || !delta.topicsDelta().deletedTopicIds().isEmpty || 
!delta.topicsDelta().changedTopics().isEmpty) {
+// If there are changes in topic metadata, let's send UMR about the 
changes to the old Zk brokers.
+if (!delta.topicsDelta().deletedTopicIds().isEmpty || 
!delta.topicsDelta().changedTopics().isEmpty) {

Review Comment:
   Yea, it should be fine since the request batch is using a set



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14661) Upgrade Zk to 3.8.1

2023-01-30 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14661:
-
Component/s: packaging
 (was: zkclient)

> Upgrade Zk to 3.8.1 
> 
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Bug
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14661) Upgrade Zk to 3.8.1

2023-01-30 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682204#comment-17682204
 ] 

Ismael Juma commented on KAFKA-14661:
-

We should focus on getting this done for 3.5.0. The patch release discussion 
should happen after 3.5.0 has happened and we have strong evidence that it's 
safe to do it as part of patch releases.

> Upgrade Zk to 3.8.1 
> 
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14661) Upgrade Zookeeper to 3.8.1

2023-01-30 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14661:
-
Summary: Upgrade Zookeeper to 3.8.1   (was: Upgrade Zk to 3.8.1 )

> Upgrade Zookeeper to 3.8.1 
> ---
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14661) Upgrade Zk to 3.8.1

2023-01-30 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14661:
-
Issue Type: Improvement  (was: Bug)

> Upgrade Zk to 3.8.1 
> 
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #13159: KAFKA-14656 Send UMR first during ZK migration

2023-01-30 Thread via GitHub


mumrah commented on code in PR #13159:
URL: https://github.com/apache/kafka/pull/13159#discussion_r1090943050


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -403,7 +404,7 @@ public void run() throws Exception {
 AtomicInteger count = new AtomicInteger(0);
 zkMigrationClient.readAllMetadata(batch -> {
 try {
-log.info("Migrating {} records from ZK: {}", 
batch.size(), batch);
+log.info("Migrating {} records from ZK", batch.size());

Review Comment:
   I wondered about that. So far, we are avoiding logging raw records in most 
places, but it could be very useful for debugging during a migration. I'll add 
it back as TRACE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-30 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090943031


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.mirror.MirrorMaker;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+@Tag("integration")
+public class DedicatedMirrorIntegrationTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
+
+private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000;
+private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000;
+
+private Map kafkaClusters;
+private Map mirrorMakers;
+
+@BeforeEach
+public void setup() {
+kafkaClusters = new HashMap<>();
+mirrorMakers = new HashMap<>();
+}
+
+@AfterEach
+public void teardown() throws Throwable {
+AtomicReference shutdownFailure = new AtomicReference<>();
+mirrorMakers.forEach((name, mirrorMaker) ->
+Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure)
+);
+kafkaClusters.forEach((name, kafkaCluster) ->
+Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" 
+ name + "'", shutdownFailure)
+);
+if (shutdownFailure.get() != null) {
+throw shutdownFailure.get();
+}
+}
+
+private EmbeddedKafkaCluster startKafkaCluster(String name, int 
numBrokers, Properties brokerProperties) {
+if (kafkaClusters.containsKey(name))
+throw new IllegalStateException("Cannot register multiple Kafka 
clusters with the same name");
+
+EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, 
brokerProperties);
+kafkaClusters.put(name, result);
+
+result.start();
+
+return result;
+}
+
+private MirrorMaker startMirrorMaker(String name, Map 
mmProps) {
+if (mirrorMakers.containsKey(name))
+throw new IllegalStateException("Cannot register multiple 
MirrorMaker nodes with the same name");
+
+MirrorMaker result = new MirrorMaker(mmProps);
+mirrorMakers.put(name, result);
+
+result.start();
+
+return result;
+}
+
+/**
+ * Test that a multi-node dedicated cluster is able to dynamically detect 
new topics at runtime

Review Comment:
   We don't have a guarantee that task configs are relayed from one worker to 
another, that's true. However, because we enable exactly-once support, we do 
get a guarantee that intra-cluster communication takes place, since no source 
tasks can start on a follower node without first issuing a REST request to the 
cluster's leader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14662) ACL listings in documentation are out of date

2023-01-30 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14662:
--

 Summary: ACL listings in documentation are out of date
 Key: KAFKA-14662
 URL: https://issues.apache.org/jira/browse/KAFKA-14662
 Project: Kafka
  Issue Type: Bug
  Components: core, docs
Reporter: Mickael Maison


ACLs listed in 
https://kafka.apache.org/documentation/#operations_resources_and_protocols are 
out of date. They only cover API keys up to 47 (OffsetDelete) and don't include 
DescribeClientQuotas, AlterClientQuotas, DescribeUserScramCredentials, 
AlterUserScramCredentials, DescribeQuorum, AlterPartition, UpdateFeatures, 
DescribeCluster, DescribeProducers, UnregisterBroker, DescribeTransactions, 
ListTransactions, AllocateProducerIds.

This is hard to keep up to date so we should consider whether this could be 
generated automatically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #13159: KAFKA-14656 Send UMR first during ZK migration

2023-01-30 Thread via GitHub


cmccabe commented on code in PR #13159:
URL: https://github.com/apache/kafka/pull/13159#discussion_r1090939081


##
core/src/main/scala/kafka/migration/MigrationPropagator.scala:
##
@@ -118,9 +124,8 @@ class MigrationPropagator(
   }
 }
 
-// If there are new brokers (including KRaft brokers) or if there are 
changes in topic
-// metadata, let's send UMR about the changes to the old Zk brokers.
-if (brokersChanged || !delta.topicsDelta().deletedTopicIds().isEmpty || 
!delta.topicsDelta().changedTopics().isEmpty) {
+// If there are changes in topic metadata, let's send UMR about the 
changes to the old Zk brokers.
+if (!delta.topicsDelta().deletedTopicIds().isEmpty || 
!delta.topicsDelta().changedTopics().isEmpty) {

Review Comment:
   Sorry, this might be a silly question, but is it OK for 
`requestBatch.addUpdateMetadataRequestForBrokers` to be called twice in some 
cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13159: KAFKA-14656 Send UMR first during ZK migration

2023-01-30 Thread via GitHub


cmccabe commented on code in PR #13159:
URL: https://github.com/apache/kafka/pull/13159#discussion_r1090937902


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -403,7 +404,7 @@ public void run() throws Exception {
 AtomicInteger count = new AtomicInteger(0);
 zkMigrationClient.readAllMetadata(batch -> {
 try {
-log.info("Migrating {} records from ZK: {}", 
batch.size(), batch);
+log.info("Migrating {} records from ZK", batch.size());

Review Comment:
   do we want to log the batch itself when TRACE is active?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13159: KAFKA-14656 Send UMR first during ZK migration

2023-01-30 Thread via GitHub


cmccabe commented on code in PR #13159:
URL: https://github.com/apache/kafka/pull/13159#discussion_r1090937161


##
metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java:
##
@@ -61,6 +61,15 @@ public Properties configProperties(ConfigResource 
configResource) {
 }
 }
 
+public Map configMap(ConfigResource configResource) {

Review Comment:
   It would be good to add JavaDoc here stating that this doesn't handle 
configuration overrides. Also, how about `configMapForResource` as a name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-30 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090936585


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##
@@ -119,7 +126,16 @@ public class MirrorMaker {
 public MirrorMaker(MirrorMakerConfig config, List clusters, Time 
time) {
 log.debug("Kafka MirrorMaker instance created");
 this.time = time;
-this.advertisedBaseUrl = "NOTUSED";
+if (config.enableInternalRest()) {
+this.restClient = new RestClient(config);
+internalServer = new MirrorRestServer(config.originals(), 
restClient);
+internalServer.initializeServer();
+this.advertisedUrl = internalServer.advertisedUrl().toString();
+} else {
+internalServer = null;
+restClient = null;
+this.advertisedUrl = "NOTUSED";

Review Comment:
   It ends up being used in the metadata workers send to the group coordinator 
during rebalances, and the schema we use to (de)serialize that data requires a 
non-null string for the worker URL. I stuck with the `"NOTUSED"` placeholder 
instead of an empty string since that's what we're using currently, and it 
doesn't seem worth the risk to try to change it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14661) Upgrade Zk to 3.8.1

2023-01-30 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-14661:


 Summary: Upgrade Zk to 3.8.1 
 Key: KAFKA-14661
 URL: https://issues.apache.org/jira/browse/KAFKA-14661
 Project: Kafka
  Issue Type: Bug
  Components: zkclient
Reporter: Divij Vaidya
Assignee: Christo Lolov
 Fix For: 3.5.0, 3.4.1, 3.3.3


Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
December 2022 [1]

Users of Kafka are facing regulatory hurdles because of using a dependency 
which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
(including patch releases of 3.3.x and 3.4.x versions).

Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
 # If we upgrade the zk server to 3.8.1, what is the impact on the zk clients. 
That is, what's the earliest zk client version that is supported by the 3.8.x 
server?
 # We need to ensure there are no regressions (particularly on the stability 
front) when it comes to this upgrade. It would be good for someone to stress 
test the system a bit with the new version and check if all works well.





[1] [https://zookeeper.apache.org/releases.html] 

 [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-30 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090935837


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##
@@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
 // Pass the shared admin to the distributed herder as an additional 
AutoCloseable object that should be closed when the
 // herder is stopped. MirrorMaker has multiple herders, and having the 
herder own the close responsibility is much easier than
 // tracking the various shared admin objects in this class.
-// Do not provide a restClient to the DistributedHerder to indicate 
that request forwarding is disabled
 Herder herder = new DistributedHerder(distributedConfig, time, worker,
 kafkaClusterId, statusBackingStore, configBackingStore,
-advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, 
sharedAdmin);
+advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY,
+restNamespace, sharedAdmin);
 herders.put(sourceAndTarget, herder);
 }
 
+private static String encodePath(String rawPath) throws 
UnsupportedEncodingException {
+return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+// Java's out-of-the-box URL encoder encodes spaces (' ') as 
pluses ('+'),
+// and pluses as '%2B'
+// But Jetty doesn't decode pluses at all and leaves them 
as-are in decoded
+// URLs
+// So to get around that, we replace pluses in the encoded URL 
here with '%20',
+// which is the encoding that Jetty expects for spaces
+// Jetty will reverse this transformation when evaluating the 
path parameters
+// and will return decoded strings with all special characters 
as they were.

Review Comment:
   It's the result of fitting a square peg (Java's `URLEncoder` class, which, 
despite the name, is designed for HTML form encoding instead of URL path 
encoding) into a round hole (URL path encoding). I couldn't find a better 
alternative than this, and considering the fairly low risk (this is all 
intended to cover a pretty niche edge case) and integration test coverage, 
figured it'd be good enough for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-30 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090935837


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##
@@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
 // Pass the shared admin to the distributed herder as an additional 
AutoCloseable object that should be closed when the
 // herder is stopped. MirrorMaker has multiple herders, and having the 
herder own the close responsibility is much easier than
 // tracking the various shared admin objects in this class.
-// Do not provide a restClient to the DistributedHerder to indicate 
that request forwarding is disabled
 Herder herder = new DistributedHerder(distributedConfig, time, worker,
 kafkaClusterId, statusBackingStore, configBackingStore,
-advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, 
sharedAdmin);
+advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY,
+restNamespace, sharedAdmin);
 herders.put(sourceAndTarget, herder);
 }
 
+private static String encodePath(String rawPath) throws 
UnsupportedEncodingException {
+return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+// Java's out-of-the-box URL encoder encodes spaces (' ') as 
pluses ('+'),
+// and pluses as '%2B'
+// But Jetty doesn't decode pluses at all and leaves them 
as-are in decoded
+// URLs
+// So to get around that, we replace pluses in the encoded URL 
here with '%20',
+// which is the encoding that Jetty expects for spaces
+// Jetty will reverse this transformation when evaluating the 
path parameters
+// and will return decoded strings with all special characters 
as they were.

Review Comment:
   It's the result of fitting a square peg (Java's `URLEncoder` class, which, 
despite the name, is designed for HTML form encoding instead of URL path 
encoding) into a round hole (URL path encoding). I couldn't find a better 
alternative than this, and considering the fairly low risk and integration test 
coverage, figured it'd be good enough for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1

2023-01-30 Thread via GitHub


ijuma commented on PR #12620:
URL: https://github.com/apache/kafka/pull/12620#issuecomment-1409028650

   A couple of things to consider:
   1. If we upgrade the zk server to 3.8.1, what is the impact on the zk 
clients. That is, what's the earliest zk client version that is supported by 
the 3.8.x server?
   2. We need to ensure there are no regressions (particularly on the stability 
front) when it comes to this upgrade. It would be good for someone to stress 
test the system a bit with the new version and check if all works well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1

2023-01-30 Thread via GitHub


ijuma commented on PR #12620:
URL: https://github.com/apache/kafka/pull/12620#issuecomment-1409023494

   @divijvaidya Yes, I think that would make sense. That should tide us over 
until the KRaft transition happens.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1

2023-01-30 Thread via GitHub


divijvaidya commented on PR #12620:
URL: https://github.com/apache/kafka/pull/12620#issuecomment-1409015865

   > It may make sense to wait a bit and go straight to 3.8.1 (once that's 
released
   Note that Zk 3.8.1 has released in Jan 2023. @ijuma, do you think it is the 
right time for us to move to 3.8.1 since the existing version in Kafka is EOL?
   
   [1] https://zookeeper.apache.org/doc/r3.8.1/releasenotes.html 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax opened a new pull request, #13175: KAFAK-14660: Fix divide-by-zero vulnerability

2023-01-30 Thread via GitHub


mjsax opened a new pull request, #13175:
URL: https://github.com/apache/kafka/pull/13175

   This PR adds a safe-guard for divide-by-zero. While `totalCapacity` can 
never be zero, an explicit error message is desirable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-01-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14660:
---

Assignee: Matthias J. Sax

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-01-30 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-14623:

Fix Version/s: 3.4.0
   (was: 3.4.1)

> OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging  
> ---
>
> Key: KAFKA-14623
> URL: https://issues.apache.org/jira/browse/KAFKA-14623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.3.1
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The OAuth code that communicates via HTTP with the IdP 
> (HttpAccessTokenRetriever.java) includes logging that outputs the request and 
> response payloads. Among them are:
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274]
>  * 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320]
> It should be determined if there are other places sensitive information might 
> be inadvertently exposed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions

2023-01-30 Thread via GitHub


C0urante commented on PR #13148:
URL: https://github.com/apache/kafka/pull/13148#issuecomment-1409001897

   Thanks Mickael!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-30 Thread via GitHub


ijuma commented on PR #13131:
URL: https://github.com/apache/kafka/pull/13131#issuecomment-1408995921

   Thanks for doing this. One thing I didn't understand is why the shared 
classes are in `server-common` instead of `tools`. Is this because `core` also 
uses the class?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions

2023-01-30 Thread via GitHub


mimaison merged PR #13148:
URL: https://github.com/apache/kafka/pull/13148


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-9975) KIP-611: Improved Handling of Abandoned Connectors and Tasks

2023-01-30 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-9975.
--
Resolution: Abandoned

> KIP-611: Improved Handling of Abandoned Connectors and Tasks
> 
>
> Key: KAFKA-9975
> URL: https://issues.apache.org/jira/browse/KAFKA-9975
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> (To be fleshed out once 
> [KIP-611|https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks]
>  is finalized)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


fvaleri commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1090852796


##
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java:
##
@@ -100,4 +104,17 @@ public static void prettyPrintTable(
 printRow(columnLengths, headers, out);
 rows.forEach(row -> printRow(columnLengths, row, out));
 }
+
+/**
+ * Returns a set of duplicated items.
+ */
+public static  Set findDuplicates(Collection collection) {

Review Comment:
   We can create a KAFKA-14525 subtask where we list all pending 
cleanups/refactorings. You could even reference it in a comment on your PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions

2023-01-30 Thread via GitHub


mimaison commented on PR #13148:
URL: https://github.com/apache/kafka/pull/13148#issuecomment-1408944140

   Thanks @C0urante, I just wanted to clarify this was indeed the intent. Like 
you, I don't think we necessarily need system tests for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


fvaleri commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1090852796


##
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java:
##
@@ -100,4 +104,17 @@ public static void prettyPrintTable(
 printRow(columnLengths, headers, out);
 rows.forEach(row -> printRow(columnLengths, row, out));
 }
+
+/**
+ * Returns a set of duplicated items.
+ */
+public static  Set findDuplicates(Collection collection) {

Review Comment:
   We can create a KAFKA-14525 subtask where we list all pending cleanups. You 
could even reference it in a comment on your PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


fvaleri commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1090852796


##
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java:
##
@@ -100,4 +104,17 @@ public static void prettyPrintTable(
 printRow(columnLengths, headers, out);
 rows.forEach(row -> printRow(columnLengths, row, out));
 }
+
+/**
+ * Returns a set of duplicated items.
+ */
+public static  Set findDuplicates(Collection collection) {

Review Comment:
   We can create a cleanup subtask on KAFKA-14525 where we list all pending 
cleanups. You could even reference it in a comment on your PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] electrical commented on pull request #12358: KAFKA-13988:Fix mm2 auto.offset.reset=latest not working

2023-01-30 Thread via GitHub


electrical commented on PR #12358:
URL: https://github.com/apache/kafka/pull/12358#issuecomment-1408885972

   We are hitting the same issue while we want to switch from MM1 to MM2.
   Would be great to see this fixed so we can switch over
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


OmniaGM commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1090808339


##
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java:
##
@@ -100,4 +104,17 @@ public static void prettyPrintTable(
 printRow(columnLengths, headers, out);
 rows.forEach(row -> printRow(columnLengths, row, out));
 }
+
+/**
+ * Returns a set of duplicated items.
+ */
+public static  Set findDuplicates(Collection collection) {

Review Comment:
   sounds like a plan. Am working on other cli commands that use the 
`CoreUtils.duplicates` so keep a note to remove it with the last command. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


fvaleri commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1090717453


##
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java:
##
@@ -100,4 +104,17 @@ public static void prettyPrintTable(
 printRow(columnLengths, headers, out);
 rows.forEach(row -> printRow(columnLengths, row, out));
 }
+
+/**
+ * Returns a set of duplicated items.
+ */
+public static  Set findDuplicates(Collection collection) {

Review Comment:
   That was my idea. I think we can drop CoreUtils.duplicates once the last 
command using it is migrated. I renamed from ToolsUtils.findDuplicates to 
ToolsUtils.duplicates, so the intention is clear. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions

2023-01-30 Thread via GitHub


C0urante commented on PR #13148:
URL: https://github.com/apache/kafka/pull/13148#issuecomment-1408822680

   @mimaison Yeah, that's correct. I'm hoping that the `verify` calls for 
`plugins::withClassLoader` will be sufficient for now. I've also added a small 
comment explaining why we perform that check, which hopefully makes this part a 
little clearer and harder to break with future changes.
   
   We could file a follow-up ticket to add true integration or system tests for 
isolated classpath loading that set up plugins off of the system classpath and 
test various APIs that interact with them (such as the `GET 
/connector-plugins//config` endpoint), but I'm on the fence as to whether 
that'd be worth the bloat in CI build time and testing logic. LMKWYT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-01-30 Thread via GitHub


pprovenzano commented on PR #13114:
URL: https://github.com/apache/kafka/pull/13114#issuecomment-1408812047

   > 
   
   The changes to `BrokerMetadataSnapshotterTest.scala` are needed and small. 
The size of the SCRAM records is larger than 1024 bytes and so I increased it 
to 4096 bytes in the max allocation size in the mock snapshotter instantiated 
in the test.  I also added a timeout to these tests because the failure mode of 
the records being to large is for the test to hang indefinitely. I had a very 
hard time figuring out which test was hanging without it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-30 Thread via GitHub


dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1090733865


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   Per 1): This parameter is always buffer.remaining(), I've cleaned up the 
call sites and removed this parameter.
   
   Per 2): Yes, its substantial. The reason is WritableByteChannelImpl writes 
in 8k chunks when feasible, instead of 1 byte chunks 
https://github.com/AdoptOpenJDK/openjdk-jdk8u/blob/2544d2a351eca1a3d62276f969dd2d95e4a4d2b6/jdk/src/share/classes/java/nio/channels/Channels.java#L442
   
   I can't show benchmarks unfortunately to demonstrate this, as they're of a 
production application and collected using internal tooling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


fvaleri commented on PR #13171:
URL: https://github.com/apache/kafka/pull/13171#issuecomment-1408759302

   Hi Omnia, thanks for the review.
   
   I've addressed some of your comments and answered the others.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


fvaleri commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1090717453


##
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java:
##
@@ -100,4 +104,17 @@ public static void prettyPrintTable(
 printRow(columnLengths, headers, out);
 rows.forEach(row -> printRow(columnLengths, row, out));
 }
+
+/**
+ * Returns a set of duplicated items.
+ */
+public static  Set findDuplicates(Collection collection) {

Review Comment:
   That was my idea. I think we can drop CoreUtils.duplicates once the last 
command using it is migrated. I renamed from findDuplicates to duplicates, so 
the intention is clear. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13161: Kafka 14128

2023-01-30 Thread via GitHub


Hangleton commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1090718859


##
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java:
##
@@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable 
cause) {
  * Waits if necessary for this future to complete, and then returns its 
result.
  */
 @Override
-public T get() throws InterruptedException, ExecutionException {
+public abstract T get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   Thanks for the follow-up. 
   
   Trying to understand your use case. 
   
   The `MetadataRequest` used to describe topics timed out. All retries were 
exhausted (4 lines) and a `TimeoutException`  (FQN 
`org.apache.kafka.common.errors.TimeoutException`) was propagated to the future 
(that is the future was completed exceptionally), and then propagated to the 
caller resulting in the behaviour observed.
   
   ```
   2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
org.apache.kafka.clients.NetworkClient   : [AdminClient 
clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting from 
node 3 due to request timeout.
   2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
org.apache.kafka.clients.NetworkClient   : [AdminClient 
clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled 
in-flight METADATA request with correlation id 985 due to node 3 being 
disconnected (elapsed time since creation: 60023ms, elapsed time since send: 
60023ms, request timeout: 3ms)
   2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] 
o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected 
error during topic description for 
L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog.
   Error message was: org.apache.kafka.common.errors.TimeoutException: 
Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, 
nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
   2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread : stream-thread 
[L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State transition 
from RUNNING to PENDING_SHUTDOWN
   ```
   
   Therefore, it seems that:
   
   - The failure of the "describe-topics" invocation was already correctly 
propagated via the future.
   - This failure results of the time out of the underlying Metadata request 
**and** exhaustion of retries.
   
   If the broker(s) were only temporarily unavailable, it seems increasing the 
number of retries may have helped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


fvaleri commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1090717453


##
server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java:
##
@@ -100,4 +104,17 @@ public static void prettyPrintTable(
 printRow(columnLengths, headers, out);
 rows.forEach(row -> printRow(columnLengths, row, out));
 }
+
+/**
+ * Returns a set of duplicated items.
+ */
+public static  Set findDuplicates(Collection collection) {

Review Comment:
   That was my idea. I think we can drop CoreUtils.duplicates once the last 
command using it is migrated. WDYT?



##
tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java:
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Arrays;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * A utility that merges the state change logs (possibly obtained from 
different brokers and over multiple days).
+ *
+ * This utility expects at least one of the following two arguments -
+ * 1. A list of state change log files
+ * 2. A regex to specify state change log file names.
+ *
+ * This utility optionally also accepts the following arguments -
+ * 1. The topic whose state change logs should be merged
+ * 2. A list of partitions whose state change logs should be merged (can be 
specified only when the topic argument
+ * is explicitly specified)
+ * 3. Start time from when the logs should be merged
+ * 4. End time until when the logs should be merged
+ */
+public class StateChangeLogMerger {
+private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS";
+private static final Pattern DATE_PATTERN = 
Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} 
[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}");
+private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + 
Topic.LEGAL_CHARS + "+),([0-9]+)\\]");
+
+private static List files;
+private static String topic;
+private static List partitions;
+private static Date startDate;
+private static Date endDate;
+
+public static void main(String[] args) {

Review Comment:
   Yeah, I thought about that, but there are shared util methods that call 
Exit.exit() directly, so we should change that first. I think we can address 
this in a separate PR once most commands are migrated. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13174: MINOR: Various cleanups in common utils

2023-01-30 Thread via GitHub


mimaison commented on code in PR #13174:
URL: https://github.com/apache/kafka/pull/13174#discussion_r1090688740


##
clients/src/main/java/org/apache/kafka/common/utils/MappedIterator.java:
##
@@ -32,12 +32,12 @@ public MappedIterator(Iterator 
underlyingIterator, Function m
 }
 
 @Override
-public final boolean hasNext() {
+public boolean hasNext() {

Review Comment:
   The class is `final`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #13174: MINOR: Various cleanups in common utils

2023-01-30 Thread via GitHub


mimaison opened a new pull request, #13174:
URL: https://github.com/apache/kafka/pull/13174

   - Remove unused methods
   - Cleanup syntax
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-30 Thread via GitHub


Hangleton commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1090683591


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   Thanks for the follow-up. 
   
   1. This new implementation ignores the `length` argument provided to the 
method if the buffer is not backed by an array. What if `length` does not equal 
the number of remaining bytes on the buffer?
   
   2. Is there an actual optimization offered by calling `write`? The 
implementation of direct buffers use a similar linear iteration. Do you have 
data showing performance improvements with this implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-30 Thread via GitHub


Hangleton commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1090683591


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   Thanks for the follow-up. 
   
   1. This new implementation ignores the `length` argument provided to the 
method if the buffer is not backed by an array. What if `length` does not equal 
the number of remaining bytes on the buffer?
   
   2. Is there an actual optimization offered by calling `write`? The 
implementation of direct buffers use a similar index-based iteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


fvaleri commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1090657703


##
tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java:
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Arrays;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * A utility that merges the state change logs (possibly obtained from 
different brokers and over multiple days).
+ *
+ * This utility expects at least one of the following two arguments -
+ * 1. A list of state change log files
+ * 2. A regex to specify state change log file names.
+ *
+ * This utility optionally also accepts the following arguments -
+ * 1. The topic whose state change logs should be merged
+ * 2. A list of partitions whose state change logs should be merged (can be 
specified only when the topic argument
+ * is explicitly specified)
+ * 3. Start time from when the logs should be merged
+ * 4. End time until when the logs should be merged
+ */
+public class StateChangeLogMerger {
+private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS";
+private static final Pattern DATE_PATTERN = 
Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} 
[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}");
+private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + 
Topic.LEGAL_CHARS + "+),([0-9]+)\\]");
+
+private static List files;
+private static String topic;
+private static List partitions;
+private static Date startDate;
+private static Date endDate;
+
+public static void main(String[] args) {

Review Comment:
   Yeah, I thought about that, but there are shared util methods that call 
Exit.exit() directly, so we should change that first. I think we can address 
this in a separate PR once most commands are migrated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tledkov opened a new pull request, #13173: Demo of leaks file descriptors of deleted logs on KafkaEmbedded#stopAsync

2023-01-30 Thread via GitHub


tledkov opened a new pull request, #13173:
URL: https://github.com/apache/kafka/pull/13173

   Demonstrates the behavior of integration test that is used the 
`EmbeddedKafkaCluster`.
   In case a test suite contains a lot of tests that creates/deletes lot of 
topics the process if finished with the error "Too many open files" .
   
   The fix:
   - `LogManager#forceDeleteLogs` method
   - and call one at the `KafkaEmbedded#stopAsync`
   is ugly and  does not claim to be correct. Just for demo.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-30 Thread via GitHub


Hangleton commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1090646744


##
clients/src/main/java/org/apache/kafka/common/utils/Time.java:
##
@@ -86,4 +89,30 @@ default Timer timer(Duration timeout) {
 return timer(timeout.toMillis());
 }
 
+/**
+ * Wait for a future to complete, or time out.
+ *
+ * @param futureThe future to wait for.
+ * @param deadlineNsThe time in the future, in monotonic nanoseconds, 
to time out.
+ * @return  The result of the future.
+ * @paramThe type of the future.
+ */
+default  T waitForFuture(
+CompletableFuture future,
+long deadlineNs
+) throws TimeoutException, InterruptedException, ExecutionException  {
+TimeoutException timeoutException = null;
+while (true) {
+long nowNs = nanoseconds();
+if (deadlineNs <= nowNs) {
+throw (timeoutException == null) ? new TimeoutException() : 
timeoutException;
+}
+long deltaNs = deadlineNs - nowNs;
+try {
+return future.get(deltaNs, TimeUnit.NANOSECONDS);

Review Comment:
   Thanks for following-up. However I double-checked the implementation of 
`CompletableFuture` and it throws a `TimeoutException` if `deltaNs` is <= 0. Am 
I missing something?



##
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+
+
+public class FutureUtils {
+/**
+ * Based on the current time and a delay, computes a monotonic deadline in 
the future.
+ *
+ * @param nowNs The current time in monotonic nanoseconds.
+ * @param delayMs   The delay in milliseconds.
+ * @return  The monotonic deadline in the future. This value is 
capped at
+ *  Long.MAX_VALUE.
+ */
+public static long getDeadlineNsFromDelayMs(
+long nowNs,
+long delayMs
+) {
+if (delayMs < 0) {
+throw new RuntimeException("Negative delays are not allowed.");
+}
+BigInteger delayNs = 
BigInteger.valueOf(delayMs).multiply(BigInteger.valueOf(1_000_000));

Review Comment:
   Got it. On point 2 that means we could have `delayMs` close enough to 
`Long.MAX_VALUE`. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR

2023-01-30 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-14139:
--
Description: 
We have been thinking about disk failure cases recently. Suppose that a disk 
has failed and the user needs to restart the disk from an empty state. The 
concern is whether this can lead to the unnecessary loss of committed data.

For normal topic partitions, removal from the ISR during controlled shutdown 
buys us some protection. After the replica is restarted, it must prove its 
state to the leader before it can be added back to the ISR. And it cannot 
become a leader until it does so.

An obvious exception to this is when the replica is the last member in the ISR. 
In this case, the disk failure itself has compromised the committed data, so 
some amount of loss must be expected.

We have been considering other scenarios in which the loss of one disk can lead 
to data loss even when there are replicas remaining which have all of the 
committed entries. One such scenario is this:

Suppose we have a partition with two replicas: A and B. Initially A is the 
leader and it is the only member of the ISR.
 # Broker B catches up to A, so A attempts to send an AlterPartition request to 
the controller to add B into the ISR.
 # Before the AlterPartition request is received, replica B has a hard failure.
 # The current controller successfully fences broker B. It takes no action on 
this partition since B is already out of the ISR.
 # Before the controller receives the AlterPartition request to add B, it also 
fails.
 # While the new controller is initializing, suppose that replica B finishes 
startup, but the disk has been replaced (all of the previous state has been 
lost).
 # The new controller sees the registration from broker B first.
 # Finally, the AlterPartition from A arrives which adds B back into the ISR 
even though it has an empty log.

(Credit for coming up with this scenario goes to [~junrao] .)

I tested this in KRaft and confirmed that this sequence is possible (even if 
perhaps unlikely). There are a few ways we could have potentially detected the 
issue. First, perhaps the leader should have bumped the leader epoch on all 
partitions when B was fenced. Then the inflight AlterPartition would be doomed 
no matter when it arrived.

Alternatively, we could have relied on the broker epoch to distinguish the dead 
broker's state from that of the restarted broker. This could be done by 
including the broker epoch in both the `Fetch` request and in `AlterPartition`.

Finally, perhaps even normal kafka replication should be using a unique 
identifier for each disk so that we can reliably detect when it has changed. 
For example, something like what was proposed for the metadata quorum here: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes].

  was:
We have been thinking about disk failure cases recently. Suppose that a disk 
has failed and the user needs to restart the disk from an empty state. The 
concern is whether this can lead to the unnecessary loss of committed data.

For normal topic partitions, removal from the ISR during controlled shutdown 
buys us some protection. After the replica is restarted, it must prove its 
state to the leader before it can be added back to the ISR. And it cannot 
become a leader until it does so.

An obvious exception to this is when the replica is the last member in the ISR. 
In this case, the disk failure itself has compromised the committed data, so 
some amount of loss must be expected.

We have been considering other scenarios in which the loss of one disk can lead 
to data loss even when there are replicas remaining which have all of the 
committed entries. One such scenario is this:

Suppose we have a partition with two replicas: A and B. Initially A is the 
leader and it is the only member of the ISR.
 # Broker B catches up to A, so A attempts to send an AlterPartition request to 
the controller to add B into the ISR.
 # Before the AlterPartition request is received, replica B has a hard failure.
 # The current controller successfully fences broker B. It takes no action on 
this partition since B is already out of the ISR.
 # Before the controller receives the AlterPartition request to add B, it also 
fails.
 # While the new controller is initializing, suppose that replica B finishes 
startup, but the disk has been replaced (all of the previous state has been 
lost).
 # The new controller sees the registration from broker B first.
 # Finally, the AlterPartition from A arrives which adds B back into the ISR 
even though it has an empty log.

(Credit for coming up with this scenario goes to [~junrao] .)

I tested this in KRaft and confirmed that this sequence is possible (even if 
perhaps unlikely). There are a few ways we could have potentially detected the 
issue. First, perhaps the leader should have 

[jira] [Commented] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR

2023-01-30 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682063#comment-17682063
 ] 

Alexandre Dupriez commented on KAFKA-14139:
---

Could reproduce without ZK controller change.

> Replaced disk can lead to loss of committed data even with non-empty ISR
> 
>
> Key: KAFKA-14139
> URL: https://issues.apache.org/jira/browse/KAFKA-14139
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Alexandre Dupriez
>Priority: Major
> Fix For: 3.5.0
>
>
> We have been thinking about disk failure cases recently. Suppose that a disk 
> has failed and the user needs to restart the disk from an empty state. The 
> concern is whether this can lead to the unnecessary loss of committed data.
> For normal topic partitions, removal from the ISR during controlled shutdown 
> buys us some protection. After the replica is restarted, it must prove its 
> state to the leader before it can be added back to the ISR. And it cannot 
> become a leader until it does so.
> An obvious exception to this is when the replica is the last member in the 
> ISR. In this case, the disk failure itself has compromised the committed 
> data, so some amount of loss must be expected.
> We have been considering other scenarios in which the loss of one disk can 
> lead to data loss even when there are replicas remaining which have all of 
> the committed entries. One such scenario is this:
> Suppose we have a partition with two replicas: A and B. Initially A is the 
> leader and it is the only member of the ISR.
>  # Broker B catches up to A, so A attempts to send an AlterPartition request 
> to the controller to add B into the ISR.
>  # Before the AlterPartition request is received, replica B has a hard 
> failure.
>  # The current controller successfully fences broker B. It takes no action on 
> this partition since B is already out of the ISR.
>  # Before the controller receives the AlterPartition request to add B, it 
> also fails.
>  # While the new controller is initializing, suppose that replica B finishes 
> startup, but the disk has been replaced (all of the previous state has been 
> lost).
>  # The new controller sees the registration from broker B first.
>  # Finally, the AlterPartition from A arrives which adds B back into the ISR 
> even though it has an empty log.
> (Credit for coming up with this scenario goes to [~junrao] .)
> I tested this in KRaft and confirmed that this sequence is possible (even if 
> perhaps unlikely). There are a few ways we could have potentially detected 
> the issue. First, perhaps the leader should have bumped the leader epoch on 
> all partitions when B was fenced. Then the inflight AlterPartition would be 
> doomed no matter when it arrived.
> Alternatively, we could have relied on the broker epoch to distinguish the 
> dead broker's state from that of the restarted broker. This could be done by 
> including the broker epoch in both the `Fetch` request and in 
> `AlterPartition`.
> Finally, perhaps even normal kafka replication should be using a unique 
> identifier for each disk so that we can reliably detect when it has changed. 
> For example, something like what was proposed for the metadata quorum here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes.]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-8217) MockConsumer.poll executes pollTasks before checking wakeup flag

2023-01-30 Thread Karsten Spang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682057#comment-17682057
 ] 

Karsten Spang commented on KAFKA-8217:
--

I think this is as it should be. I would want to call wakeup() from a poll 
task, so I knew when it got delivered.

> MockConsumer.poll executes pollTasks before checking wakeup flag
> 
>
> Key: KAFKA-8217
> URL: https://issues.apache.org/jira/browse/KAFKA-8217
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 2.0.1
>Reporter: Kevin
>Priority: Minor
>
> The tasks scheduled in MockConsumer.schedulePollTask seem to be for 
> simulating interactions between the Consumer and the brokers, but 
> MockConsumer.poll runs scheduledPollTasks before it checks the wakeup flag. 
> Given that the KafkaConsumer.poll method checks the wakeup flag before doing 
> any other logic, it seems like the MockConsumer should rather check wakeup 
> before running scheduledPollTasks.
> This makes it difficult to control exactly how many Consumer.poll invocations 
> occur in a unit test when using the wakeup() pattern described in 
> "Multithreaded Processing" in the KafkaConsumer docs, as gating each poll so 
> that it returns only when instructed in the test requires submitting a 
> scheduledPollTask that blocks until the test unblocks it. The trouble occurs 
> when trying to shut down the consumer, as the poll() task needs to be 
> unblocked in order to receive the WakeupException but 
> https://issues.apache.org/jira/browse/KAFKA-8216 means that Consumer.poll() 
> can be called many times in the race condition interval after poll has been 
> unblocked but before the test has called Consumer.wakeup().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-01-30 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-14660:

Description: 
Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
and, because the PR was never merged, is now reporting it as a security 
vulnerability in the latest Kafka Streams library.

 

See:
 * [Vulnerability: 
sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]

 * [Original PR]([https://github.com/apache/kafka/pull/7414])

 

While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
divide-by-zero is not really an issue, the fact that its now being reported as 
a vulnerability is, especially with regulators.

PITA, but we should consider either getting this vulnerability removed (Google 
wasn't very helpful in providing info on how to do this), or fixed (Again, not 
sure how to tag the fix as fixing this issue).  One option may just be to 
reopen the PR and merge (and then fix forward by switching it to throw an 
exception).

  was:
Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
and, because the PR was never merged, is now reporting a it as a security 
vulnerability in the latest Kafka Streams library.

 

See:
 * [Vulnerability: 
sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]

 * [Original PR](https://github.com/apache/kafka/pull/7414)

 

While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
divide-by-zero is not really an issue, the fact that its now being reported as 
a vulnerability is, especially with regulators.

PITA, but we should consider either getting this vulnerability removed (Google 
wasn't very helpful in providing info on how to do this), or fixed (Again, not 
sure how to tag the fix as fixing this issue).  One option may just be to 
reopen the PR and merge (and then fix forward by switching it to throw an 
exception).


> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Priority: Minor
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14660) Divide by zero security vulnerability

2023-01-30 Thread Andy Coates (Jira)
Andy Coates created KAFKA-14660:
---

 Summary: Divide by zero security vulnerability
 Key: KAFKA-14660
 URL: https://issues.apache.org/jira/browse/KAFKA-14660
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.2
Reporter: Andy Coates


Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
and, because the PR was never merged, is now reporting a it as a security 
vulnerability in the latest Kafka Streams library.

 

See:
 * [Vulnerability: 
sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]

 * [Original PR](https://github.com/apache/kafka/pull/7414)

 

While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
divide-by-zero is not really an issue, the fact that its now being reported as 
a vulnerability is, especially with regulators.

PITA, but we should consider either getting this vulnerability removed (Google 
wasn't very helpful in providing info on how to do this), or fixed (Again, not 
sure how to tag the fix as fixing this issue).  One option may just be to 
reopen the PR and merge (and then fix forward by switching it to throw an 
exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-01-30 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-14660:

Summary: Divide by zero security vulnerability (sonatype-2019-0422)  (was: 
Divide by zero security vulnerability)

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Priority: Minor
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting a it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR](https://github.com/apache/kafka/pull/7414)
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-30 Thread via GitHub


vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1408469183

   > I simply meant that it looks like some changes in the system tests are 
required too.
   
   Got it. Thanks for the confirmation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14592) Move FeatureCommand to tools

2023-01-30 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge reassigned KAFKA-14592:
-

Assignee: Gantigmaa Selenge

> Move FeatureCommand to tools
> 
>
> Key: KAFKA-14592
> URL: https://issues.apache.org/jira/browse/KAFKA-14592
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


OmniaGM commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1090471930


##
tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java:
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Arrays;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * A utility that merges the state change logs (possibly obtained from 
different brokers and over multiple days).
+ *
+ * This utility expects at least one of the following two arguments -
+ * 1. A list of state change log files
+ * 2. A regex to specify state change log file names.
+ *
+ * This utility optionally also accepts the following arguments -
+ * 1. The topic whose state change logs should be merged
+ * 2. A list of partitions whose state change logs should be merged (can be 
specified only when the topic argument
+ * is explicitly specified)
+ * 3. Start time from when the logs should be merged
+ * 4. End time until when the logs should be merged
+ */
+public class StateChangeLogMerger {
+private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS";
+private static final Pattern DATE_PATTERN = 
Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} 
[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}");
+private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + 
Topic.LEGAL_CHARS + "+),([0-9]+)\\]");
+
+private static List files;
+private static String topic;
+private static List partitions;
+private static Date startDate;
+private static Date endDate;
+
+public static void main(String[] args) {

Review Comment:
   It may be better to unify the cli tool classes to use same pattern as 
`ClusterTools` and `MetadataQuorumCommand`to have `main`, `mainNoExit`, and 
`execute`. Example is here 
https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java#L49



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-01-30 Thread via GitHub


OmniaGM commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1090495143


##
tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java:
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Arrays;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * A utility that merges the state change logs (possibly obtained from 
different brokers and over multiple days).
+ *
+ * This utility expects at least one of the following two arguments -
+ * 1. A list of state change log files
+ * 2. A regex to specify state change log file names.
+ *
+ * This utility optionally also accepts the following arguments -
+ * 1. The topic whose state change logs should be merged
+ * 2. A list of partitions whose state change logs should be merged (can be 
specified only when the topic argument
+ * is explicitly specified)
+ * 3. Start time from when the logs should be merged
+ * 4. End time until when the logs should be merged
+ */
+public class StateChangeLogMerger {
+private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS";
+private static final Pattern DATE_PATTERN = 
Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} 
[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}");
+private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + 
Topic.LEGAL_CHARS + "+),([0-9]+)\\]");
+
+private static List files;
+private static String topic;
+private static List partitions;
+private static Date startDate;
+private static Date endDate;
+
+public static void main(String[] args) {
+try {
+StateChangeLogMergerOptions options = new 
StateChangeLogMergerOptions(args);
+if (CommandLineUtils.isPrintHelpNeeded(options)) {
+CommandLineUtils.printUsageAndExit(options.parser,
+"A tool for merging the log files from several brokers to 
reconstruct a unified history of what happened.");
+return;
+}
+if (CommandLineUtils.isPrintVersionNeeded(options)) {
+CommandLineUtils.printVersionAndExit();
+return;
+}
+
+if ((!options.hasFiles() && !options.hasRegex()) || 
(options.hasFiles() && options.hasRegex())) {

Review Comment:
   I know the original code use this pattern, however, can't we use 
`CommandLineUtils.checkInvalidArgs` here instead?
   



##
tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java:
##
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the 

[jira] [Assigned] (KAFKA-14581) Move GetOffsetShell to tools

2023-01-30 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge reassigned KAFKA-14581:
-

Assignee: Gantigmaa Selenge

> Move GetOffsetShell to tools
> 
>
> Key: KAFKA-14581
> URL: https://issues.apache.org/jira/browse/KAFKA-14581
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tinaselenge opened a new pull request, #13172: KAFKA-14590: Move DelegationTokenCommand to tools

2023-01-30 Thread via GitHub


tinaselenge opened a new pull request, #13172:
URL: https://github.com/apache/kafka/pull/13172

   Support delegationToken APIs in MockAdminClient.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-30 Thread via GitHub


mimaison commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1408441506

   I simply meant that it looks like some changes in the system tests are 
required too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >