[GitHub] [kafka] mjsax merged pull request #13175: KAFAK-14660: Fix divide-by-zero vulnerability
mjsax merged PR #13175: URL: https://github.com/apache/kafka/pull/13175 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14665) my custom SMT that converts Int to String does not work for primary keys
[ https://issues.apache.org/jira/browse/KAFKA-14665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fuxin Hao updated KAFKA-14665: -- Description: I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in [Debezium config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types]. which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to transform these data from numeric types to strings. Say I have the following table: {code:java} CREATE TABLE pk_created_at ( created_at timestamp without time zone DEFAULT current_timestamp not null, PRIMARY KEY (created_at) ); insert into pk_created_at values(current_timestamp); {code} My source connector configuration: {code:java} { "name": "test-connector", "config": { "snapshot.mode": "always", "plugin.name": "pgoutput", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "source", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "test", "database.server.name": "test", "slot.name" : "test", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enabled": true, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enabled": true, "decimal.handling.mode": "string", "time.precision.mode": "adaptive", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } } {code} And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be: {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"int64", "optional":false, "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":1669354751764130 } }{code} After applying my SMT, the messages would be like this: {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":"2022-11-25T05:39:11.764130Z" } }{code} {{ }} It worke[d great if |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}} is not a part of primary keys. No error occurred. But the primary keys on some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in `{{{}JdbcSinkConnector`{}}} as below: {code:java} 2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || Checking PostgreSql dialect for existence of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN || Write of 2 records failed, remainingRetries=0 [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 Call getNextException to see other errors in the
[jira] [Assigned] (KAFKA-14650) IQv2 can throw ConcurrentModificationException when accessing Tasks
[ https://issues.apache.org/jira/browse/KAFKA-14650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-14650: --- Assignee: Guozhang Wang > IQv2 can throw ConcurrentModificationException when accessing Tasks > > > Key: KAFKA-14650 > URL: https://issues.apache.org/jira/browse/KAFKA-14650 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: A. Sophie Blee-Goldman >Assignee: Guozhang Wang >Priority: Major > > From failure in *[PositionRestartIntegrationTest.verifyStore[cache=false, > log=true, supplier=IN_MEMORY_WINDOW, > kind=PAPI]|https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/63/testReport/junit/org.apache.kafka.streams.integration/PositionRestartIntegrationTest/Build___JDK_11_and_Scala_2_13___verifyStore_cache_false__log_true__supplier_IN_MEMORY_WINDOW__kind_PAPI_/]* > java.util.ConcurrentModificationException > at > java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) > at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) > at java.base/java.util.HashMap.putMapEntries(HashMap.java:508) > at java.base/java.util.HashMap.putAll(HashMap.java:781) > at > org.apache.kafka.streams.processor.internals.Tasks.allTasksPerId(Tasks.java:361) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1537) > at > org.apache.kafka.streams.processor.internals.StreamThread.allTasks(StreamThread.java:1278) > at org.apache.kafka.streams.KafkaStreams.query(KafkaStreams.java:1921) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.iqv2WaitForResult(IntegrationTestUtils.java:168) > at > org.apache.kafka.streams.integration.PositionRestartIntegrationTest.shouldReachExpectedPosition(PositionRestartIntegrationTest.java:438) > at > org.apache.kafka.streams.integration.PositionRestartIntegrationTest.verifyStore(PositionRestartIntegrationTest.java:423) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14665) my custom SMT that converts Int to String does not work for primary keys
Fuxin Hao created KAFKA-14665: - Summary: my custom SMT that converts Int to String does not work for primary keys Key: KAFKA-14665 URL: https://issues.apache.org/jira/browse/KAFKA-14665 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.1.0 Reporter: Fuxin Hao I'm using {{`io.debezium.connector.postgresql.PostgresConnector}} and {{io.confluent.connect.jdbc.JdbcSinkConnector`}} to sync data between two PostgreSQL databases. And I set `{{{}time.precision.mode=adaptive`{}}} in [Debezium config|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-temporal-types]. which would serialize PostgreSQL time data type to {{Integer}} or {{Long}} and it's incompatible with {{{}JdbcSinkConnector{}}}. So I wrote an SMT to transform these data from numeric types to strings. Say I have the following table: {code:java} CREATE TABLE pk_created_at ( created_at timestamp without time zone DEFAULT current_timestamp not null, PRIMARY KEY (created_at) ); insert into pk_created_at values(current_timestamp); {code} {{}} {{}} My source connector configuration: {code:java} { "name": "test-connector", "config": { "snapshot.mode": "always", "plugin.name": "pgoutput", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "source", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "test", "database.server.name": "test", "slot.name" : "test", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enabled": true, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enabled": true, "decimal.handling.mode": "string", "time.precision.mode": "adaptive", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } } {code} {{}} {{}} And the messages in kafka topic `{{{}test.public.pk_created_at`{}}} would be: {{}} {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"int64", "optional":false, "name":"io.debezium.time.MicroTimestamp", "version":1, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":1669354751764130 } }{code} {{}} After applying my SMT, the messages would be like this: {{}} {code:java} # bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test.public.pk_created_at --from-beginning { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"created_at" } ], "optional":false, "name":"test.public.pk_created_at.Value" }, "payload":{ "created_at":"2022-11-25T05:39:11.764130Z" } }{code} {{ }} It worke[d great if |https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java]{{`created_at`}} is not a part of primary keys. No error occurred. But the primary keys on some of my tables are composed of `{{{}id`{}}} and `{{{}created_at`{}}} like this: `{{{}PRIMARY KEY (id, created_at)`{}}}. Then it raised an exception in `{{{}JdbcSinkConnector`{}}} as below: {{}} {code:java} 2022-11-25 06:57:01,450 INFO || Attempting to open connection #1 to PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider] 2022-11-25 06:57:01,459 INFO || Maximum table name length for database is 63 bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect] 2022-11-25 06:57:01,459 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter] 2022-11-25 06:57:01,472 INFO || Checking PostgreSql dialect for existence of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,484 INFO || Using PostgreSql dialect TABLE "pk_created_at" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,505 INFO || Checking PostgreSql dialect for type of TABLE "pk_created_at" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect] 2022-11-25 06:57:01,508 INFO || Setting metadata for table "pk_created_at" to Table{name='"pk_created_at"', type=TABLE columns=[Column{'created_at', isPrimaryKey=true, allowsNull=false, sqlType=timestamp}]} [io.confluent.connect.jdbc.util.TableDefinitions] 2022-11-25 06:57:01,510 WARN || Write of 2 records failed, remainingRetries=0 [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT ("cr
[jira] [Assigned] (KAFKA-14487) Move LogManager to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14487: - Assignee: Sagar Rao > Move LogManager to storage module > - > > Key: KAFKA-14487 > URL: https://issues.apache.org/jira/browse/KAFKA-14487 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Sagar Rao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14585) Move StorageTool to tools
[ https://issues.apache.org/jira/browse/KAFKA-14585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14585: - Assignee: Sagar Rao > Move StorageTool to tools > - > > Key: KAFKA-14585 > URL: https://issues.apache.org/jira/browse/KAFKA-14585 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Sagar Rao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Hangleton commented on pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
Hangleton commented on PR #13169: URL: https://github.com/apache/kafka/pull/13169#issuecomment-1409850803 Taking a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13159: KAFKA-14656 Send UMR first during ZK migration
cmccabe merged PR #13159: URL: https://github.com/apache/kafka/pull/13159 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)
[ https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682401#comment-17682401 ] Matthias J. Sax commented on KAFKA-14646: - I was thinking about this issue, and I think the only way to upgrade is to "drain" your topology. Ie, you would need to stop your upstream producers and not send any new input data. Afterwards, you let KS finish processing of all input data (including processing of all data from internal topics, ie, repartition, fk-subscription, and fk-response topics), to really "drain" the topology completely. Next, do a two round rolling bounce using `upgrade.from`, and finally resume your upstream producers. Would you be willing to try this out (and report back)? > SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> > 3.3.2) > > > Key: KAFKA-14646 > URL: https://issues.apache.org/jira/browse/KAFKA-14646 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0, 3.3.1, 3.3.2 > Environment: Kafka Streams 3.2.3 (before update) > Kafka Streams 3.3.2 (after update) > Java 17 (Eclipse Temurin 17.0.5), Linux x86_64 >Reporter: Jochen Schalanda >Priority: Major > Fix For: 3.4.0, 3.3.3 > > > Hey folks, > > we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and > started getting the following exceptions: > {code:java} > org.apache.kafka.common.errors.UnsupportedVersionException: > SubscriptionWrapper is of an incompatible version. {code} > After swiftly looking through the code, this exception is potentially thrown > in two places: > * > [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78] > ** Here the check was changed in Kafka 3.3.x: > [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12] > * > [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99] > ** Here the check wasn't changed. > > Is it possible that the second check in > {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten? > > Any hints how to resolve this issue without a downgrade? > Since this only affects 2 of 15 topologies in the application, I'm hesitant > to just downgrade to Kafka 3.2.3 again since the internal topics might > already have been updated to use the "new" version of > {{{}SubscriptionWrapper{}}}. > > Related discussion in the Confluent Community Slack: > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119] > h2. Stack trace > {code:java} > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_8, > processor=XXX-joined-changed-fk-subscription-registration-source, > topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, > partition=8, offset=12297976, > stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: > SubscriptionWrapper is of an incompatible version. > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Nayana-ibm commented on pull request #11929: MINOR: s390x Stage
Nayana-ibm commented on PR #11929: URL: https://github.com/apache/kafka/pull/11929#issuecomment-1409780907 Any update on this please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14664) Raft idle ratio is inaccurate
Jason Gustafson created KAFKA-14664: --- Summary: Raft idle ratio is inaccurate Key: KAFKA-14664 URL: https://issues.apache.org/jira/browse/KAFKA-14664 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson The `poll-idle-ratio-avg` metric is intended to track how idle the raft IO thread is. When completely idle, it should measure 1. When saturated, it should measure 0. The problem with the current measurements is that they are treated equally with respect to time. For example, say we poll twice with the following durations: Poll 1: 2s Poll 2: 0s Assume that the busy time is negligible, so 2s passes overall. In the first measurement, 2s is spent waiting, so we compute and record a ratio of 1.0. In the second measurement, no time passes, and we record 0.0. The idle ratio is then computed as the average of these two values (1.0 + 0.0 / 2 = 0.5), which suggests that the process was busy for 1s. Instead, we should sum up the time waiting over the full interval. 2s passes total here and 2s is idle, so we should compute 1.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +480,34 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +if (object instanceof AutoCloseable) { +try { +((AutoCloseable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} +} else if (object instanceof Closeable) { +try { +((Closeable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} Review Comment: @C0urante I'm glad to hear that you are feeling better. Yes, given the KIP avoidance constraint, I really did not want to change getConfiguredInstances, but it seemed the only way to address this was to change it very carefully. However, I believe I've implemented each of your comments and thanks for the excellent feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +480,34 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +if (object instanceof AutoCloseable) { +try { +((AutoCloseable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} +} else if (object instanceof Closeable) { +try { +((Closeable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} Review Comment: @C0urante I'm glad to hear that you are feeling better. Yes, given the KIP avoidance constraint, I really did not want to change getConfiguredInstances, but it seemed the only way to address this was to change it very carefully. However, I believe I've implemented each of your comments and athanks for the excellent feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +480,34 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +if (object instanceof AutoCloseable) { +try { +((AutoCloseable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} +} else if (object instanceof Closeable) { +try { +((Closeable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} Review Comment: @C0urante I'm glad to hear that you are feeling better. Yes, given the KIP avoidance constraint, I really wanted to avoid changing getConfiguredInstances, but it seemed the only way to address this was to change it. However, I believe I've implemented each of your comments and athanks for the excellent feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +480,34 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +if (object instanceof AutoCloseable) { +try { +((AutoCloseable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} +} else if (object instanceof Closeable) { +try { +((Closeable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} Review Comment: @C0urante I'm glad to hear that you are feeling better. Yes, given the KIP avoidance constraint, it seemed the only way to address this was to change the getConfiguredInstances. But, I believe I've implemented each of your comments and thanks for the excellent feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14663) High throughput topics can starve low-throughput MM2 offset syncs
Greg Harris created KAFKA-14663: --- Summary: High throughput topics can starve low-throughput MM2 offset syncs Key: KAFKA-14663 URL: https://issues.apache.org/jira/browse/KAFKA-14663 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.3.0, 3.0.0, 3.1.0, 3.4.0, 3.5.0 Reporter: Greg Harris Assignee: Greg Harris In MM2, a semaphore is used to throttle the number of offset syncs written to the offset-syncs topic. If too many offset writes are requested (for example, from high-throughput topics) then some are silently dropped and never retried. This is acceptable for a single topic-partition, where a later record may re-trigger the offset-sync and write the sync successfully. However, if there is a large variance between throughput in the topics emitted by an MM2 instance, it is possible for high-throughput topics to trigger many offset syncs, and cause the offset-syncs for a co-located low-throughput topic to be unfairly dropped. This can cause the downstream offsets for the starved topic to lag behind significantly, or be prevented completely. Instead, we should have some sort of fairness mechanism where low-thoughput topics are given similar priority to high-throughput topics in producing offset syncs, and cause excess sync messages from high-throughput topics to be dropped instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft
cmccabe commented on code in PR #13116: URL: https://github.com/apache/kafka/pull/13116#discussion_r1091311692 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -322,15 +325,37 @@ class ControllerApis(val requestChannel: RequestChannel, } } // Finally, the idToName map contains all the topics that we are authorized to delete. -// Perform the deletion and create responses for each one. -controller.deleteTopics(context, idToName.keySet).thenApply { idToError => - idToError.forEach { (id, error) => -appendResponse(idToName.get(id), id, error) +// First check the controller mutation quota if necessary, and then Review Comment: This is a lot more logic (and expense) than what I wanted, and unfortunately it also has a TOCTOU, because partitions may be added in between checking here and doing the deletion. I guess I simply didn't forsee this, sorry. I wonder if we could convert the throttle to be lockless so that we could use it in the QuorumController thread itself. Like I said earlier, the big concern is that blocking the controller thread is really bad and not something we want to do under really any condition. It seems like the throttle should be simple... just a number or something right? Or can we somehow guarantee that only the controller thread itself is taking that lock under ordinary conditions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
anatasiavela commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1409535758 @divijvaidya Since this PR is strictly focusing on the metric, I think it's reasonable for that change to be done in a separate PR. what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)
[ https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682304#comment-17682304 ] Andy Coates commented on KAFKA-14660: - As per description, I think the best 'fix' might be to re-open the PR and merge it, as it looks to me that the vulnerability report it tied to the PR. If you know how I can tag a new PR to resolve the vulnerability, then I'm all ears. > Divide by zero security vulnerability (sonatype-2019-0422) > -- > > Key: KAFKA-14660 > URL: https://issues.apache.org/jira/browse/KAFKA-14660 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Andy Coates >Assignee: Matthias J. Sax >Priority: Minor > > Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR > and, because the PR was never merged, is now reporting it as a security > vulnerability in the latest Kafka Streams library. > > See: > * [Vulnerability: > sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] > * [Original PR]([https://github.com/apache/kafka/pull/7414]) > > While it looks from the comments made by [~mjsax] and [~bbejeck] that the > divide-by-zero is not really an issue, the fact that its now being reported > as a vulnerability is, especially with regulators. > PITA, but we should consider either getting this vulnerability removed > (Google wasn't very helpful in providing info on how to do this), or fixed > (Again, not sure how to tag the fix as fixing this issue). One option may > just be to reopen the PR and merge (and then fix forward by switching it to > throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
cmccabe commented on PR #13169: URL: https://github.com/apache/kafka/pull/13169#issuecomment-1409506162 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request, #13176: MINOR: some ZK migration code cleanups.
cmccabe opened a new pull request, #13176: URL: https://github.com/apache/kafka/pull/13176 Some minor improvements to the JavaDoc for ZkMigrationState. Rename MigrationState to MigrationDriverState to avoid confusion with ZkMigrationState. Remove ClusterImage#zkBrokers. This costs O(num_brokers) time to calculate, but is only ever used when in migration state. It should just be calculated in the migration code. (Additionally, the function ClusterImage.zkBrokers() returns something other than ClusterImage#zkBrokers, which is confusing.) Also remove ClusterDelta#liveZkBrokerIdChanges. This is only used in one place, and it's easy to calculate it there. In general we should avoid providing expensive accessors unless absolutely necessary. Expensive code should look expensive: if people want to iterate over all brokers, they can write a loop to do that rather than hiding it inside an accessor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
cmccabe commented on PR #13169: URL: https://github.com/apache/kafka/pull/13169#issuecomment-1409444203 Looks like Jenkins is having issues. ``` [2023-01-30T20:15:48.665Z] + ./retry_zinc ./gradlew -PscalaVersion=2.13 clean compileJava compileScala compileTestJava compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat --profile --continue -PxmlSpotBugsReport=true -PkeepAliveMode=session [2023-01-30T20:15:49.622Z] To honour the JVM settings for this build a single-use Daemon process will be forked. See https://docs.gradle.org/7.6/userguide/gradle_daemon.html#sec:disabling_the_daemon. [2023-01-30T20:15:51.411Z] Daemon will be stopped at the end of the build [2023-01-30T20:16:59.582Z] [2023-01-30T20:16:59.582Z] FAILURE: Build failed with an exception. [2023-01-30T20:16:59.582Z] [2023-01-30T20:16:59.582Z] * What went wrong: [2023-01-30T20:16:59.582Z] Gradle could not start your build. [2023-01-30T20:16:59.582Z] > Cannot create service of type BuildSessionActionExecutor using method LauncherServices$ToolingBuildSessionScopeServices.createActionExecutor() as there is a problem with parameter #21 of type FileSystemWatchingInformation. [2023-01-30T20:16:59.582Z]> Cannot create service of type BuildLifecycleAwareVirtualFileSystem using method VirtualFileSystemServices$GradleUserHomeServices.createVirtualFileSystem() as there is a problem with parameter #7 of type GlobalCacheLocations. [2023-01-30T20:16:59.582Z] > Cannot create service of type GlobalCacheLocations using method GradleUserHomeScopeServices.createGlobalCacheLocations() as there is a problem with parameter #1 of type List. [2023-01-30T20:16:59.582Z] > Could not create service of type FileAccessTimeJournal using GradleUserHomeScopeServices.createFileAccessTimeJournal(). [2023-01-30T20:16:59.582Z] > Timeout waiting to lock journal cache (/home/jenkins/.gradle/caches/journal-1). It is currently in use by another Gradle instance. [2023-01-30T20:16:59.582Z] Owner PID: 13197 [2023-01-30T20:16:59.582Z] Our PID: 16325 [2023-01-30T20:16:59.582Z] Owner Operation: [2023-01-30T20:16:59.582Z] Our operation: [2023-01-30T20:16:59.582Z] Lock file: /home/jenkins/.gradle/caches/journal-1/journal-1.lock ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13161: Kafka 14128
mjsax commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1091223587 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java: ## @@ -538,6 +544,8 @@ protected Map getNumPartitions(final Set topics, tempUnknownTopics.add(topicName); log.debug("The leader of topic {} is not available.\n" + "Error message was: {}", topicName, cause.toString()); +} else if (cause instanceof TimeoutException) { +throw new RuntimeException(); Review Comment: We should not throw an exception here, but do what we do inside the existing `catch TimeoutException` block (that we can remove). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13161: Kafka 14128
mjsax commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1091222822 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java: ## @@ -466,7 +469,10 @@ public Set makeReady(final Map topics) { topicName) ); } -} else { +} else if (cause instanceof TimeoutException) { Review Comment: I actually believe, we can remove the `catch TimeoutException` block below, because `get()` should never throw a timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)
[ https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682291#comment-17682291 ] Matthias J. Sax commented on KAFKA-14660: - Hey Andy – thanks for the ticket. We would have accepted a PR, too. :D > Divide by zero security vulnerability (sonatype-2019-0422) > -- > > Key: KAFKA-14660 > URL: https://issues.apache.org/jira/browse/KAFKA-14660 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Andy Coates >Assignee: Matthias J. Sax >Priority: Minor > > Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR > and, because the PR was never merged, is now reporting it as a security > vulnerability in the latest Kafka Streams library. > > See: > * [Vulnerability: > sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] > * [Original PR]([https://github.com/apache/kafka/pull/7414]) > > While it looks from the comments made by [~mjsax] and [~bbejeck] that the > divide-by-zero is not really an issue, the fact that its now being reported > as a vulnerability is, especially with regulators. > PITA, but we should consider either getting this vulnerability removed > (Google wasn't very helpful in providing info on how to do this), or fixed > (Again, not sure how to tag the fix as fixing this issue). One option may > just be to reopen the PR and merge (and then fix forward by switching it to > throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
C0urante commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1091084063 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +480,34 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +if (object instanceof AutoCloseable) { +try { +((AutoCloseable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} +} else if (object instanceof Closeable) { +try { +((Closeable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} Review Comment: This can be simplified a bit: ```suggestion Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances"); ``` 1. `Utils::closeQuietly` handles failures for us 2. `Closeable` is a subinterface of `AutoCloseable`, so we only need to check for the latter ## clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java: ## @@ -55,6 +58,11 @@ public void configure(Map configs) { Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG); if (clientIdValue == null) throw new ConfigException("Mock consumer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG); + +CONFIG_COUNT.incrementAndGet(); +if (CONFIG_COUNT.get() == THROW_CONFIG_EXCEPTION_THRESHOLD.get()) { +throw new ConfigException("Kafka producer creation failed. Failure may not have cleaned up listener thread resource."); Review Comment: It seems like the failure message here is hinting that we try to create a Kafka producer in this interceptor, but there isn't much else in the class to go along with that. Could we use a more generic message like "Failed to instantiate interceptor (reached throw-on-config threshold)"? ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -503,6 +505,30 @@ public void testInterceptorConstructorClose() { } } +@Test +public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() { Review Comment: I like this test. Could we also get a matching one for producer interceptors, and something analogous for the `AbstractConfig` class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)
[ https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682265#comment-17682265 ] Andy Coates commented on KAFKA-14660: - Hey Matthias ;) > Divide by zero security vulnerability (sonatype-2019-0422) > -- > > Key: KAFKA-14660 > URL: https://issues.apache.org/jira/browse/KAFKA-14660 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Andy Coates >Assignee: Matthias J. Sax >Priority: Minor > > Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR > and, because the PR was never merged, is now reporting it as a security > vulnerability in the latest Kafka Streams library. > > See: > * [Vulnerability: > sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] > * [Original PR]([https://github.com/apache/kafka/pull/7414]) > > While it looks from the comments made by [~mjsax] and [~bbejeck] that the > divide-by-zero is not really an issue, the fact that its now being reported > as a vulnerability is, especially with regulators. > PITA, but we should consider either getting this vulnerability removed > (Google wasn't very helpful in providing info on how to do this), or fixed > (Again, not sure how to tag the fix as fixing this issue). One option may > just be to reopen the PR and merge (and then fix forward by switching it to > throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1091087602 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs +) throws TimeoutException, InterruptedException, ExecutionException { +TimeoutException timeoutException = null; +while (true) { +long nowNs = nanoseconds(); +if (deadlineNs <= nowNs) { +throw (timeoutException == null) ? new TimeoutException() : timeoutException; +} +long deltaNs = deadlineNs - nowNs; +try { +return future.get(deltaNs, TimeUnit.NANOSECONDS); Review Comment: Thanks for reviewing. I should have been clearer that this wasn't the main point of the PR :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
Hangleton commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1091084191 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs +) throws TimeoutException, InterruptedException, ExecutionException { +TimeoutException timeoutException = null; +while (true) { +long nowNs = nanoseconds(); +if (deadlineNs <= nowNs) { +throw (timeoutException == null) ? new TimeoutException() : timeoutException; +} +long deltaNs = deadlineNs - nowNs; +try { +return future.get(deltaNs, TimeUnit.NANOSECONDS); Review Comment: Got it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vladimirdyuzhev commented on a diff in pull request #13081: Re-using callbackHandler for refreshing Kerberos TGT when keytab is not used
vladimirdyuzhev commented on code in PR #13081: URL: https://github.com/apache/kafka/pull/13081#discussion_r1091084183 ## clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java: ## @@ -90,6 +91,7 @@ public void configure(Map configs, String contextName, Configuration this.minTimeBeforeRelogin = (Long) configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN); this.kinitCmd = (String) configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD); this.serviceName = getServiceName(configs, contextName, configuration); +this.callbackHandler = callbackHandler; Review Comment: I have moved the loginHandler to AbstractLogin and added a test. I've sent an email to priv...@kafka.apache.org to create a JIRA account for me to open a security issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
mjsax commented on PR #12654: URL: https://github.com/apache/kafka/pull/12654#issuecomment-1409251099 Why has this PR 392 commits? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
mjsax commented on code in PR #12654: URL: https://github.com/apache/kafka/pull/12654#discussion_r1091058442 ## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ## @@ -37,6 +37,9 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: not: missing `` between the paragraphs. -- Seems they are also missing on other places in this file -- can we fix all of them? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -988,6 +988,18 @@ public void unregister(final Collection revokedChangelogs) { if (changelogMetadata != null) { if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) { revokedInitializedChangelogs.add(partition); + +// if the changelog is still in REGISTERED, it means it has not initialized and started Review Comment: > if the changelog is still in REGISTERED We check `!changelogMetadata.state().equals(ChangelogState.REGISTERED)` above, so it seem we _are_ not in REGISTRED state here. Should the comment go somewhere else, or not use `if...` but say: `because...` ? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -988,6 +988,18 @@ public void unregister(final Collection revokedChangelogs) { if (changelogMetadata != null) { if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) { revokedInitializedChangelogs.add(partition); + +// if the changelog is still in REGISTERED, it means it has not initialized and started +// restoring yet, and hence the corresponding onRestoreStart was not called; in this case +// we should not call onRestorePaused either Review Comment: nit `onRestorePaused` -> `onRestoreSuspended` ## streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java: ## @@ -37,6 +37,7 @@ public class MockStateRestoreListener implements StateRestoreListener { public static final String RESTORE_START = "restore_start"; public static final String RESTORE_BATCH = "restore_batch"; public static final String RESTORE_END = "restore_end"; +public static final String RESTORE_PAUSED = "restore_paused"; Review Comment: nit: rename -> `RESTORE_SUSPENDED` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -197,6 +198,49 @@ public void shouldNotRegisterStoreWithoutMetadata() { () -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager)); } +@Test +public void shouldSupportUnregisterChangelogBeforeCompletion() { Review Comment: We should only call this is we are in `RESTORING` state, right? If we are `COMPLETED`, we should not call `suspend` because we did call `end` already? So the test name seems missleading? ## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ## @@ -37,6 +37,9 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: > standby tasks restoration process Is "restoration" the best wording? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -197,6 +198,49 @@ public void shouldNotRegisterStoreWithoutMetadata() { () -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager)); } +@Test +public void shouldSupportUnregisterChangelogBeforeCompletion() { Review Comment: Should we add a case for a "completed" partition, and that we don't call `suspended` during `unregister` for this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13163: KAFKA-14653: MirrorMakerConfig using raw properties instead of post-r…
C0urante commented on code in PR #13163: URL: https://github.com/apache/kafka/pull/13163#discussion_r1091016715 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -246,13 +246,22 @@ public Map originals(Map configOverrides) { */ public Map originalsStrings() { Map copy = new RecordingMap<>(); +copyAsStrings(originals, copy); +return copy; +} + +/** + * Ensures that all values of a map are strings, and copies them to another map. + * @param originals The map to validate. + * @param copy The target to copy to. + */ +protected static void copyAsStrings(Map originals, Map copy) { Review Comment: This counts as a change to public interface since subclasses of `AbstractConfig` would be able to access this new method. And since this wasn't mentioned in the KIP, we probably shouldn't do that here. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java: ## @@ -82,10 +82,16 @@ public class MirrorMakerConfig extends AbstractConfig { static final String TARGET_PREFIX = "target."; private final Plugins plugins; - + +private final Map rawProperties; + +@SuppressWarnings("unchecked") public MirrorMakerConfig(Map props) { super(CONFIG_DEF, props, true); plugins = new Plugins(originalsStrings()); + +rawProperties = new HashMap<>(); +copyAsStrings((Map) props, rawProperties); Review Comment: It looks like this is intended to get around the fact that the constructor accepts a `Map` but we really want to work with a `Map`. Could we change the constructor to accept a `Map` and store that as our `rawProperties` field, instead of introducing the `copyAsStrings` method? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java: ## @@ -130,15 +136,14 @@ public List clusterPairs() { */ public MirrorClientConfig clientConfig(String cluster) { Map props = new HashMap<>(); -props.putAll(originalsStrings()); +props.putAll(rawProperties); Review Comment: Why change this part? Aren't we transforming the configs later at line 141 anyways? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
cmccabe commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1091061226 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs +) throws TimeoutException, InterruptedException, ExecutionException { +TimeoutException timeoutException = null; +while (true) { +long nowNs = nanoseconds(); +if (deadlineNs <= nowNs) { +throw (timeoutException == null) ? new TimeoutException() : timeoutException; +} +long deltaNs = deadlineNs - nowNs; +try { +return future.get(deltaNs, TimeUnit.NANOSECONDS); Review Comment: The JavaDoc for CompletableFuture doesn't say what it does in case of negative timeouts. Therefore, it's a bad idea to rely on this behavior. Also, the subtraction you propose may overflow. In any case this code was part of a different PR, #13153. I only included it here because I needed something from FutureUtils. Sorry for the confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13161: Kafka 14128
cmccabe commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1091058054 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java: ## @@ -521,7 +524,7 @@ protected Map getNumPartitions(final Set topics, for (final Map.Entry> topicFuture : futures.entrySet()) { final String topicName = topicFuture.getKey(); try { -final TopicDescription topicDescription = topicFuture.getValue().get(); +final TopicDescription topicDescription = topicFuture.getValue().get(Long.parseLong(DEFAULT_API_TIMEOUT_MS_CONFIG), TimeUnit.MILLISECONDS); Review Comment: Do not put timeout parameters on Future.get. This will not abort the operation. If you want timeouts, they should be done by adjusting your admin client configuration parameters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13153: MINOR: startup timeouts for KRaft integration tests
cmccabe merged PR #13153: URL: https://github.com/apache/kafka/pull/13153 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
ijuma commented on PR #13131: URL: https://github.com/apache/kafka/pull/13131#issuecomment-1409209835 Got it, so it's useful for `main` methods too - not just CLI tools. Then it's fine for it to be in `server-common`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals
Hangleton commented on code in PR #13162: URL: https://github.com/apache/kafka/pull/13162#discussion_r1091027869 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel, * @param length The number of bytes to write * @throws IOException For any errors writing to the output */ -public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException { +public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException { if (buffer.hasArray()) { out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length); } else { -int pos = buffer.position(); -for (int i = pos; i < length + pos; i++) -out.writeByte(buffer.get(i)); +Channels.newChannel(out).write(buffer); Review Comment: Got it. Thanks. Would you have performance gains at a high-level, without sharing details on the application? Publicly available data showing the expectable gains would back up the PR further. Has this been discussed in the dev mailing list already? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13161: Kafka 14128
Hangleton commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1090718859 ## clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java: ## @@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable cause) { * Waits if necessary for this future to complete, and then returns its result. */ @Override -public T get() throws InterruptedException, ExecutionException { +public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Review Comment: Thanks for the follow-up. Trying to understand your use case. The `MetadataRequest` used to describe topics timed out. All retries were exhausted (4 lines) and a `TimeoutException` (FQN `org.apache.kafka.common.errors.TimeoutException`) was propagated to the future (that is, the future was completed exceptionally), and then propagated to the caller resulting in the behaviour observed. ``` 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting from node 3 due to request timeout. 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled in-flight METADATA request with correlation id 985 due to node 3 being disconnected (elapsed time since creation: 60023ms, elapsed time since send: 60023ms, request timeout: 3ms) 2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected error during topic description for L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog. Error message was: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s) 2022-07-29 13:39:37.869 INFO 25843 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN ``` Therefore, it seems that: - The failure of the "describe-topics" invocation was already correctly propagated via the future. - This failure results of the time out of the underlying Metadata request **and** exhaustion of retries. If the broker(s) were only temporarily unavailable, it seems increasing the number of retries may have helped? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on PR #13131: URL: https://github.com/apache/kafka/pull/13131#issuecomment-1409100823 > Thanks for doing this. One thing I didn't understand is why the shared classes are in `server-common` instead of `tools`. Is this because `core` also uses the class? Good question. CommandLineUtils is mostly used by tools, but Kafka.scala has a dependency on it. In order to move these 2 shared classes to the tools module, we would need to add tools dependency to core. Is this correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14661) Upgrade Zookeeper to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682204#comment-17682204 ] Ismael Juma edited comment on KAFKA-14661 at 1/30/23 5:39 PM: -- We should focus on getting this done for Apache Kafka 3.5.0. The patch release discussion should happen after 3.5.0 has happened and we have strong evidence that it's safe to do it as part of patch releases. was (Author: ijuma): We should focus on getting this done for 3.5.0. The patch release discussion should happen after 3.5.0 has happened and we have strong evidence that it's safe to do it as part of patch releases. > Upgrade Zookeeper to 3.8.1 > --- > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #13159: KAFKA-14656 Send UMR first during ZK migration
mumrah commented on code in PR #13159: URL: https://github.com/apache/kafka/pull/13159#discussion_r1090944813 ## core/src/main/scala/kafka/migration/MigrationPropagator.scala: ## @@ -118,9 +124,8 @@ class MigrationPropagator( } } -// If there are new brokers (including KRaft brokers) or if there are changes in topic -// metadata, let's send UMR about the changes to the old Zk brokers. -if (brokersChanged || !delta.topicsDelta().deletedTopicIds().isEmpty || !delta.topicsDelta().changedTopics().isEmpty) { +// If there are changes in topic metadata, let's send UMR about the changes to the old Zk brokers. +if (!delta.topicsDelta().deletedTopicIds().isEmpty || !delta.topicsDelta().changedTopics().isEmpty) { Review Comment: Yea, it should be fine since the request batch is using a set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14661) Upgrade Zk to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14661: - Component/s: packaging (was: zkclient) > Upgrade Zk to 3.8.1 > > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Bug > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14661) Upgrade Zk to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682204#comment-17682204 ] Ismael Juma commented on KAFKA-14661: - We should focus on getting this done for 3.5.0. The patch release discussion should happen after 3.5.0 has happened and we have strong evidence that it's safe to do it as part of patch releases. > Upgrade Zk to 3.8.1 > > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14661) Upgrade Zookeeper to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14661: - Summary: Upgrade Zookeeper to 3.8.1 (was: Upgrade Zk to 3.8.1 ) > Upgrade Zookeeper to 3.8.1 > --- > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14661) Upgrade Zk to 3.8.1
[ https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14661: - Issue Type: Improvement (was: Bug) > Upgrade Zk to 3.8.1 > > > Key: KAFKA-14661 > URL: https://issues.apache.org/jira/browse/KAFKA-14661 > Project: Kafka > Issue Type: Improvement > Components: packaging >Reporter: Divij Vaidya >Assignee: Christo Lolov >Priority: Blocker > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Current Zk version (3.6.x) supported by Apache Kafka has been EOL since > December 2022 [1] > Users of Kafka are facing regulatory hurdles because of using a dependency > which is EOL, hence, I would suggest to upgrade this in all upcoming releases > (including patch releases of 3.3.x and 3.4.x versions). > Some things to consider while upgrading (as pointed by [~ijuma] at [2]): > # If we upgrade the zk server to 3.8.1, what is the impact on the zk > clients. That is, what's the earliest zk client version that is supported by > the 3.8.x server? > # We need to ensure there are no regressions (particularly on the stability > front) when it comes to this upgrade. It would be good for someone to stress > test the system a bit with the new version and check if all works well. > [1] [https://zookeeper.apache.org/releases.html] > [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #13159: KAFKA-14656 Send UMR first during ZK migration
mumrah commented on code in PR #13159: URL: https://github.com/apache/kafka/pull/13159#discussion_r1090943050 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -403,7 +404,7 @@ public void run() throws Exception { AtomicInteger count = new AtomicInteger(0); zkMigrationClient.readAllMetadata(batch -> { try { -log.info("Migrating {} records from ZK: {}", batch.size(), batch); +log.info("Migrating {} records from ZK", batch.size()); Review Comment: I wondered about that. So far, we are avoiding logging raw records in most places, but it could be very useful for debugging during a migration. I'll add it back as TRACE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1090943031 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.mirror.MirrorMaker; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.test.TestUtils.waitForCondition; + +@Tag("integration") +public class DedicatedMirrorIntegrationTest { + +private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class); + +private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000; +private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000; + +private Map kafkaClusters; +private Map mirrorMakers; + +@BeforeEach +public void setup() { +kafkaClusters = new HashMap<>(); +mirrorMakers = new HashMap<>(); +} + +@AfterEach +public void teardown() throws Throwable { +AtomicReference shutdownFailure = new AtomicReference<>(); +mirrorMakers.forEach((name, mirrorMaker) -> +Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure) +); +kafkaClusters.forEach((name, kafkaCluster) -> +Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" + name + "'", shutdownFailure) +); +if (shutdownFailure.get() != null) { +throw shutdownFailure.get(); +} +} + +private EmbeddedKafkaCluster startKafkaCluster(String name, int numBrokers, Properties brokerProperties) { +if (kafkaClusters.containsKey(name)) +throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name"); + +EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, brokerProperties); +kafkaClusters.put(name, result); + +result.start(); + +return result; +} + +private MirrorMaker startMirrorMaker(String name, Map mmProps) { +if (mirrorMakers.containsKey(name)) +throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name"); + +MirrorMaker result = new MirrorMaker(mmProps); +mirrorMakers.put(name, result); + +result.start(); + +return result; +} + +/** + * Test that a multi-node dedicated cluster is able to dynamically detect new topics at runtime Review Comment: We don't have a guarantee that task configs are relayed from one worker to another, that's true. However, because we enable exactly-once support, we do get a guarantee that intra-cluster communication takes place, since no source tasks can start on a follower node without first issuing a REST request to the cluster's leader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14662) ACL listings in documentation are out of date
Mickael Maison created KAFKA-14662: -- Summary: ACL listings in documentation are out of date Key: KAFKA-14662 URL: https://issues.apache.org/jira/browse/KAFKA-14662 Project: Kafka Issue Type: Bug Components: core, docs Reporter: Mickael Maison ACLs listed in https://kafka.apache.org/documentation/#operations_resources_and_protocols are out of date. They only cover API keys up to 47 (OffsetDelete) and don't include DescribeClientQuotas, AlterClientQuotas, DescribeUserScramCredentials, AlterUserScramCredentials, DescribeQuorum, AlterPartition, UpdateFeatures, DescribeCluster, DescribeProducers, UnregisterBroker, DescribeTransactions, ListTransactions, AllocateProducerIds. This is hard to keep up to date so we should consider whether this could be generated automatically. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #13159: KAFKA-14656 Send UMR first during ZK migration
cmccabe commented on code in PR #13159: URL: https://github.com/apache/kafka/pull/13159#discussion_r1090939081 ## core/src/main/scala/kafka/migration/MigrationPropagator.scala: ## @@ -118,9 +124,8 @@ class MigrationPropagator( } } -// If there are new brokers (including KRaft brokers) or if there are changes in topic -// metadata, let's send UMR about the changes to the old Zk brokers. -if (brokersChanged || !delta.topicsDelta().deletedTopicIds().isEmpty || !delta.topicsDelta().changedTopics().isEmpty) { +// If there are changes in topic metadata, let's send UMR about the changes to the old Zk brokers. +if (!delta.topicsDelta().deletedTopicIds().isEmpty || !delta.topicsDelta().changedTopics().isEmpty) { Review Comment: Sorry, this might be a silly question, but is it OK for `requestBatch.addUpdateMetadataRequestForBrokers` to be called twice in some cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13159: KAFKA-14656 Send UMR first during ZK migration
cmccabe commented on code in PR #13159: URL: https://github.com/apache/kafka/pull/13159#discussion_r1090937902 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -403,7 +404,7 @@ public void run() throws Exception { AtomicInteger count = new AtomicInteger(0); zkMigrationClient.readAllMetadata(batch -> { try { -log.info("Migrating {} records from ZK: {}", batch.size(), batch); +log.info("Migrating {} records from ZK", batch.size()); Review Comment: do we want to log the batch itself when TRACE is active? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13159: KAFKA-14656 Send UMR first during ZK migration
cmccabe commented on code in PR #13159: URL: https://github.com/apache/kafka/pull/13159#discussion_r1090937161 ## metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java: ## @@ -61,6 +61,15 @@ public Properties configProperties(ConfigResource configResource) { } } +public Map configMap(ConfigResource configResource) { Review Comment: It would be good to add JavaDoc here stating that this doesn't handle configuration overrides. Also, how about `configMapForResource` as a name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1090936585 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java: ## @@ -119,7 +126,16 @@ public class MirrorMaker { public MirrorMaker(MirrorMakerConfig config, List clusters, Time time) { log.debug("Kafka MirrorMaker instance created"); this.time = time; -this.advertisedBaseUrl = "NOTUSED"; +if (config.enableInternalRest()) { +this.restClient = new RestClient(config); +internalServer = new MirrorRestServer(config.originals(), restClient); +internalServer.initializeServer(); +this.advertisedUrl = internalServer.advertisedUrl().toString(); +} else { +internalServer = null; +restClient = null; +this.advertisedUrl = "NOTUSED"; Review Comment: It ends up being used in the metadata workers send to the group coordinator during rebalances, and the schema we use to (de)serialize that data requires a non-null string for the worker URL. I stuck with the `"NOTUSED"` placeholder instead of an empty string since that's what we're using currently, and it doesn't seem worth the risk to try to change it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14661) Upgrade Zk to 3.8.1
Divij Vaidya created KAFKA-14661: Summary: Upgrade Zk to 3.8.1 Key: KAFKA-14661 URL: https://issues.apache.org/jira/browse/KAFKA-14661 Project: Kafka Issue Type: Bug Components: zkclient Reporter: Divij Vaidya Assignee: Christo Lolov Fix For: 3.5.0, 3.4.1, 3.3.3 Current Zk version (3.6.x) supported by Apache Kafka has been EOL since December 2022 [1] Users of Kafka are facing regulatory hurdles because of using a dependency which is EOL, hence, I would suggest to upgrade this in all upcoming releases (including patch releases of 3.3.x and 3.4.x versions). Some things to consider while upgrading (as pointed by [~ijuma] at [2]): # If we upgrade the zk server to 3.8.1, what is the impact on the zk clients. That is, what's the earliest zk client version that is supported by the 3.8.x server? # We need to ensure there are no regressions (particularly on the stability front) when it comes to this upgrade. It would be good for someone to stress test the system a bit with the new version and check if all works well. [1] [https://zookeeper.apache.org/releases.html] [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1090935837 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java: ## @@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) { // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than // tracking the various shared admin objects in this class. -// Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled Herder herder = new DistributedHerder(distributedConfig, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, -advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin); +advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY, +restNamespace, sharedAdmin); herders.put(sourceAndTarget, herder); } +private static String encodePath(String rawPath) throws UnsupportedEncodingException { +return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name()) +// Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'), +// and pluses as '%2B' +// But Jetty doesn't decode pluses at all and leaves them as-are in decoded +// URLs +// So to get around that, we replace pluses in the encoded URL here with '%20', +// which is the encoding that Jetty expects for spaces +// Jetty will reverse this transformation when evaluating the path parameters +// and will return decoded strings with all special characters as they were. Review Comment: It's the result of fitting a square peg (Java's `URLEncoder` class, which, despite the name, is designed for HTML form encoding instead of URL path encoding) into a round hole (URL path encoding). I couldn't find a better alternative than this, and considering the fairly low risk (this is all intended to cover a pretty niche edge case) and integration test coverage, figured it'd be good enough for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1090935837 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java: ## @@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) { // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than // tracking the various shared admin objects in this class. -// Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled Herder herder = new DistributedHerder(distributedConfig, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, -advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin); +advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY, +restNamespace, sharedAdmin); herders.put(sourceAndTarget, herder); } +private static String encodePath(String rawPath) throws UnsupportedEncodingException { +return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name()) +// Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'), +// and pluses as '%2B' +// But Jetty doesn't decode pluses at all and leaves them as-are in decoded +// URLs +// So to get around that, we replace pluses in the encoded URL here with '%20', +// which is the encoding that Jetty expects for spaces +// Jetty will reverse this transformation when evaluating the path parameters +// and will return decoded strings with all special characters as they were. Review Comment: It's the result of fitting a square peg (Java's `URLEncoder` class, which, despite the name, is designed for HTML form encoding instead of URL path encoding) into a round hole (URL path encoding). I couldn't find a better alternative than this, and considering the fairly low risk and integration test coverage, figured it'd be good enough for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1
ijuma commented on PR #12620: URL: https://github.com/apache/kafka/pull/12620#issuecomment-1409028650 A couple of things to consider: 1. If we upgrade the zk server to 3.8.1, what is the impact on the zk clients. That is, what's the earliest zk client version that is supported by the 3.8.x server? 2. We need to ensure there are no regressions (particularly on the stability front) when it comes to this upgrade. It would be good for someone to stress test the system a bit with the new version and check if all works well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1
ijuma commented on PR #12620: URL: https://github.com/apache/kafka/pull/12620#issuecomment-1409023494 @divijvaidya Yes, I think that would make sense. That should tide us over until the KRaft transition happens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1
divijvaidya commented on PR #12620: URL: https://github.com/apache/kafka/pull/12620#issuecomment-1409015865 > It may make sense to wait a bit and go straight to 3.8.1 (once that's released Note that Zk 3.8.1 has released in Jan 2023. @ijuma, do you think it is the right time for us to move to 3.8.1 since the existing version in Kafka is EOL? [1] https://zookeeper.apache.org/doc/r3.8.1/releasenotes.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request, #13175: KAFAK-14660: Fix divide-by-zero vulnerability
mjsax opened a new pull request, #13175: URL: https://github.com/apache/kafka/pull/13175 This PR adds a safe-guard for divide-by-zero. While `totalCapacity` can never be zero, an explicit error message is desirable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)
[ https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-14660: --- Assignee: Matthias J. Sax > Divide by zero security vulnerability (sonatype-2019-0422) > -- > > Key: KAFKA-14660 > URL: https://issues.apache.org/jira/browse/KAFKA-14660 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Andy Coates >Assignee: Matthias J. Sax >Priority: Minor > > Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR > and, because the PR was never merged, is now reporting it as a security > vulnerability in the latest Kafka Streams library. > > See: > * [Vulnerability: > sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] > * [Original PR]([https://github.com/apache/kafka/pull/7414]) > > While it looks from the comments made by [~mjsax] and [~bbejeck] that the > divide-by-zero is not really an issue, the fact that its now being reported > as a vulnerability is, especially with regulators. > PITA, but we should consider either getting this vulnerability removed > (Google wasn't very helpful in providing info on how to do this), or fixed > (Again, not sure how to tag the fix as fixing this issue). One option may > just be to reopen the PR and merge (and then fix forward by switching it to > throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14623) OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
[ https://issues.apache.org/jira/browse/KAFKA-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-14623: Fix Version/s: 3.4.0 (was: 3.4.1) > OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging > --- > > Key: KAFKA-14623 > URL: https://issues.apache.org/jira/browse/KAFKA-14623 > Project: Kafka > Issue Type: Bug > Components: clients, security >Affects Versions: 3.3.1 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 3.4.0, 3.3.3 > > Original Estimate: 24h > Remaining Estimate: 24h > > The OAuth code that communicates via HTTP with the IdP > (HttpAccessTokenRetriever.java) includes logging that outputs the request and > response payloads. Among them are: > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L265] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L274] > * > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java#L320] > It should be determined if there are other places sensitive information might > be inadvertently exposed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions
C0urante commented on PR #13148: URL: https://github.com/apache/kafka/pull/13148#issuecomment-1409001897 Thanks Mickael! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
ijuma commented on PR #13131: URL: https://github.com/apache/kafka/pull/13131#issuecomment-1408995921 Thanks for doing this. One thing I didn't understand is why the shared classes are in `server-common` instead of `tools`. Is this because `core` also uses the class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions
mimaison merged PR #13148: URL: https://github.com/apache/kafka/pull/13148 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9975) KIP-611: Improved Handling of Abandoned Connectors and Tasks
[ https://issues.apache.org/jira/browse/KAFKA-9975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-9975. -- Resolution: Abandoned > KIP-611: Improved Handling of Abandoned Connectors and Tasks > > > Key: KAFKA-9975 > URL: https://issues.apache.org/jira/browse/KAFKA-9975 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > Labels: needs-kip > > (To be fleshed out once > [KIP-611|https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks] > is finalized) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
fvaleri commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1090852796 ## server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java: ## @@ -100,4 +104,17 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + +/** + * Returns a set of duplicated items. + */ +public static Set findDuplicates(Collection collection) { Review Comment: We can create a KAFKA-14525 subtask where we list all pending cleanups/refactorings. You could even reference it in a comment on your PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions
mimaison commented on PR #13148: URL: https://github.com/apache/kafka/pull/13148#issuecomment-1408944140 Thanks @C0urante, I just wanted to clarify this was indeed the intent. Like you, I don't think we necessarily need system tests for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
fvaleri commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1090852796 ## server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java: ## @@ -100,4 +104,17 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + +/** + * Returns a set of duplicated items. + */ +public static Set findDuplicates(Collection collection) { Review Comment: We can create a KAFKA-14525 subtask where we list all pending cleanups. You could even reference it in a comment on your PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
fvaleri commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1090852796 ## server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java: ## @@ -100,4 +104,17 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + +/** + * Returns a set of duplicated items. + */ +public static Set findDuplicates(Collection collection) { Review Comment: We can create a cleanup subtask on KAFKA-14525 where we list all pending cleanups. You could even reference it in a comment on your PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] electrical commented on pull request #12358: KAFKA-13988:Fix mm2 auto.offset.reset=latest not working
electrical commented on PR #12358: URL: https://github.com/apache/kafka/pull/12358#issuecomment-1408885972 We are hitting the same issue while we want to switch from MM1 to MM2. Would be great to see this fixed so we can switch over Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
OmniaGM commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1090808339 ## server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java: ## @@ -100,4 +104,17 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + +/** + * Returns a set of duplicated items. + */ +public static Set findDuplicates(Collection collection) { Review Comment: sounds like a plan. Am working on other cli commands that use the `CoreUtils.duplicates` so keep a note to remove it with the last command. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
fvaleri commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1090717453 ## server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java: ## @@ -100,4 +104,17 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + +/** + * Returns a set of duplicated items. + */ +public static Set findDuplicates(Collection collection) { Review Comment: That was my idea. I think we can drop CoreUtils.duplicates once the last command using it is migrated. I renamed from ToolsUtils.findDuplicates to ToolsUtils.duplicates, so the intention is clear. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions
C0urante commented on PR #13148: URL: https://github.com/apache/kafka/pull/13148#issuecomment-1408822680 @mimaison Yeah, that's correct. I'm hoping that the `verify` calls for `plugins::withClassLoader` will be sufficient for now. I've also added a small comment explaining why we perform that check, which hopefully makes this part a little clearer and harder to break with future changes. We could file a follow-up ticket to add true integration or system tests for isolated classpath loading that set up plugins off of the system classpath and test various APIs that interact with them (such as the `GET /connector-plugins//config` endpoint), but I'm on the fence as to whether that'd be worth the bloat in CI build time and testing logic. LMKWYT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on PR #13114: URL: https://github.com/apache/kafka/pull/13114#issuecomment-1408812047 > The changes to `BrokerMetadataSnapshotterTest.scala` are needed and small. The size of the SCRAM records is larger than 1024 bytes and so I increased it to 4096 bytes in the max allocation size in the mock snapshotter instantiated in the test. I also added a timeout to these tests because the failure mode of the records being to large is for the test to hang indefinitely. I had a very hard time figuring out which test was hanging without it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals
dpcollins-google commented on code in PR #13162: URL: https://github.com/apache/kafka/pull/13162#discussion_r1090733865 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel, * @param length The number of bytes to write * @throws IOException For any errors writing to the output */ -public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException { +public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException { if (buffer.hasArray()) { out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length); } else { -int pos = buffer.position(); -for (int i = pos; i < length + pos; i++) -out.writeByte(buffer.get(i)); +Channels.newChannel(out).write(buffer); Review Comment: Per 1): This parameter is always buffer.remaining(), I've cleaned up the call sites and removed this parameter. Per 2): Yes, its substantial. The reason is WritableByteChannelImpl writes in 8k chunks when feasible, instead of 1 byte chunks https://github.com/AdoptOpenJDK/openjdk-jdk8u/blob/2544d2a351eca1a3d62276f969dd2d95e4a4d2b6/jdk/src/share/classes/java/nio/channels/Channels.java#L442 I can't show benchmarks unfortunately to demonstrate this, as they're of a production application and collected using internal tooling -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
fvaleri commented on PR #13171: URL: https://github.com/apache/kafka/pull/13171#issuecomment-1408759302 Hi Omnia, thanks for the review. I've addressed some of your comments and answered the others. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
fvaleri commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1090717453 ## server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java: ## @@ -100,4 +104,17 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + +/** + * Returns a set of duplicated items. + */ +public static Set findDuplicates(Collection collection) { Review Comment: That was my idea. I think we can drop CoreUtils.duplicates once the last command using it is migrated. I renamed from findDuplicates to duplicates, so the intention is clear. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13161: Kafka 14128
Hangleton commented on code in PR #13161: URL: https://github.com/apache/kafka/pull/13161#discussion_r1090718859 ## clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java: ## @@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable cause) { * Waits if necessary for this future to complete, and then returns its result. */ @Override -public T get() throws InterruptedException, ExecutionException { +public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Review Comment: Thanks for the follow-up. Trying to understand your use case. The `MetadataRequest` used to describe topics timed out. All retries were exhausted (4 lines) and a `TimeoutException` (FQN `org.apache.kafka.common.errors.TimeoutException`) was propagated to the future (that is the future was completed exceptionally), and then propagated to the caller resulting in the behaviour observed. ``` 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting from node 3 due to request timeout. 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled in-flight METADATA request with correlation id 985 due to node 3 being disconnected (elapsed time since creation: 60023ms, elapsed time since send: 60023ms, request timeout: 3ms) 2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected error during topic description for L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog. Error message was: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s) 2022-07-29 13:39:37.869 INFO 25843 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN ``` Therefore, it seems that: - The failure of the "describe-topics" invocation was already correctly propagated via the future. - This failure results of the time out of the underlying Metadata request **and** exhaustion of retries. If the broker(s) were only temporarily unavailable, it seems increasing the number of retries may have helped? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
fvaleri commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1090717453 ## server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java: ## @@ -100,4 +104,17 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + +/** + * Returns a set of duplicated items. + */ +public static Set findDuplicates(Collection collection) { Review Comment: That was my idea. I think we can drop CoreUtils.duplicates once the last command using it is migrated. WDYT? ## tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java: ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.ToolsUtils; + +import java.io.File; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.Arrays; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Scanner; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days). + * + * This utility expects at least one of the following two arguments - + * 1. A list of state change log files + * 2. A regex to specify state change log file names. + * + * This utility optionally also accepts the following arguments - + * 1. The topic whose state change logs should be merged + * 2. A list of partitions whose state change logs should be merged (can be specified only when the topic argument + * is explicitly specified) + * 3. Start time from when the logs should be merged + * 4. End time until when the logs should be merged + */ +public class StateChangeLogMerger { +private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS"; +private static final Pattern DATE_PATTERN = Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}"); +private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + Topic.LEGAL_CHARS + "+),([0-9]+)\\]"); + +private static List files; +private static String topic; +private static List partitions; +private static Date startDate; +private static Date endDate; + +public static void main(String[] args) { Review Comment: Yeah, I thought about that, but there are shared util methods that call Exit.exit() directly, so we should change that first. I think we can address this in a separate PR once most commands are migrated. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13174: MINOR: Various cleanups in common utils
mimaison commented on code in PR #13174: URL: https://github.com/apache/kafka/pull/13174#discussion_r1090688740 ## clients/src/main/java/org/apache/kafka/common/utils/MappedIterator.java: ## @@ -32,12 +32,12 @@ public MappedIterator(Iterator underlyingIterator, Function m } @Override -public final boolean hasNext() { +public boolean hasNext() { Review Comment: The class is `final` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #13174: MINOR: Various cleanups in common utils
mimaison opened a new pull request, #13174: URL: https://github.com/apache/kafka/pull/13174 - Remove unused methods - Cleanup syntax ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals
Hangleton commented on code in PR #13162: URL: https://github.com/apache/kafka/pull/13162#discussion_r1090683591 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel, * @param length The number of bytes to write * @throws IOException For any errors writing to the output */ -public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException { +public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException { if (buffer.hasArray()) { out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length); } else { -int pos = buffer.position(); -for (int i = pos; i < length + pos; i++) -out.writeByte(buffer.get(i)); +Channels.newChannel(out).write(buffer); Review Comment: Thanks for the follow-up. 1. This new implementation ignores the `length` argument provided to the method if the buffer is not backed by an array. What if `length` does not equal the number of remaining bytes on the buffer? 2. Is there an actual optimization offered by calling `write`? The implementation of direct buffers use a similar linear iteration. Do you have data showing performance improvements with this implementation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals
Hangleton commented on code in PR #13162: URL: https://github.com/apache/kafka/pull/13162#discussion_r1090683591 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel, * @param length The number of bytes to write * @throws IOException For any errors writing to the output */ -public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException { +public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException { if (buffer.hasArray()) { out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length); } else { -int pos = buffer.position(); -for (int i = pos; i < length + pos; i++) -out.writeByte(buffer.get(i)); +Channels.newChannel(out).write(buffer); Review Comment: Thanks for the follow-up. 1. This new implementation ignores the `length` argument provided to the method if the buffer is not backed by an array. What if `length` does not equal the number of remaining bytes on the buffer? 2. Is there an actual optimization offered by calling `write`? The implementation of direct buffers use a similar index-based iteration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
fvaleri commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1090657703 ## tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java: ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.ToolsUtils; + +import java.io.File; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.Arrays; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Scanner; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days). + * + * This utility expects at least one of the following two arguments - + * 1. A list of state change log files + * 2. A regex to specify state change log file names. + * + * This utility optionally also accepts the following arguments - + * 1. The topic whose state change logs should be merged + * 2. A list of partitions whose state change logs should be merged (can be specified only when the topic argument + * is explicitly specified) + * 3. Start time from when the logs should be merged + * 4. End time until when the logs should be merged + */ +public class StateChangeLogMerger { +private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS"; +private static final Pattern DATE_PATTERN = Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}"); +private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + Topic.LEGAL_CHARS + "+),([0-9]+)\\]"); + +private static List files; +private static String topic; +private static List partitions; +private static Date startDate; +private static Date endDate; + +public static void main(String[] args) { Review Comment: Yeah, I thought about that, but there are shared util methods that call Exit.exit() directly, so we should change that first. I think we can address this in a separate PR once most commands are migrated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tledkov opened a new pull request, #13173: Demo of leaks file descriptors of deleted logs on KafkaEmbedded#stopAsync
tledkov opened a new pull request, #13173: URL: https://github.com/apache/kafka/pull/13173 Demonstrates the behavior of integration test that is used the `EmbeddedKafkaCluster`. In case a test suite contains a lot of tests that creates/deletes lot of topics the process if finished with the error "Too many open files" . The fix: - `LogManager#forceDeleteLogs` method - and call one at the `KafkaEmbedded#stopAsync` is ugly and does not claim to be correct. Just for demo. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic
Hangleton commented on code in PR #13169: URL: https://github.com/apache/kafka/pull/13169#discussion_r1090646744 ## clients/src/main/java/org/apache/kafka/common/utils/Time.java: ## @@ -86,4 +89,30 @@ default Timer timer(Duration timeout) { return timer(timeout.toMillis()); } +/** + * Wait for a future to complete, or time out. + * + * @param futureThe future to wait for. + * @param deadlineNsThe time in the future, in monotonic nanoseconds, to time out. + * @return The result of the future. + * @paramThe type of the future. + */ +default T waitForFuture( +CompletableFuture future, +long deadlineNs +) throws TimeoutException, InterruptedException, ExecutionException { +TimeoutException timeoutException = null; +while (true) { +long nowNs = nanoseconds(); +if (deadlineNs <= nowNs) { +throw (timeoutException == null) ? new TimeoutException() : timeoutException; +} +long deltaNs = deadlineNs - nowNs; +try { +return future.get(deltaNs, TimeUnit.NANOSECONDS); Review Comment: Thanks for following-up. However I double-checked the implementation of `CompletableFuture` and it throws a `TimeoutException` if `deltaNs` is <= 0. Am I missing something? ## server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java: ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.math.BigInteger; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; + + +public class FutureUtils { +/** + * Based on the current time and a delay, computes a monotonic deadline in the future. + * + * @param nowNs The current time in monotonic nanoseconds. + * @param delayMs The delay in milliseconds. + * @return The monotonic deadline in the future. This value is capped at + * Long.MAX_VALUE. + */ +public static long getDeadlineNsFromDelayMs( +long nowNs, +long delayMs +) { +if (delayMs < 0) { +throw new RuntimeException("Negative delays are not allowed."); +} +BigInteger delayNs = BigInteger.valueOf(delayMs).multiply(BigInteger.valueOf(1_000_000)); Review Comment: Got it. On point 2 that means we could have `delayMs` close enough to `Long.MAX_VALUE`. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexandre Dupriez updated KAFKA-14139: -- Description: We have been thinking about disk failure cases recently. Suppose that a disk has failed and the user needs to restart the disk from an empty state. The concern is whether this can lead to the unnecessary loss of committed data. For normal topic partitions, removal from the ISR during controlled shutdown buys us some protection. After the replica is restarted, it must prove its state to the leader before it can be added back to the ISR. And it cannot become a leader until it does so. An obvious exception to this is when the replica is the last member in the ISR. In this case, the disk failure itself has compromised the committed data, so some amount of loss must be expected. We have been considering other scenarios in which the loss of one disk can lead to data loss even when there are replicas remaining which have all of the committed entries. One such scenario is this: Suppose we have a partition with two replicas: A and B. Initially A is the leader and it is the only member of the ISR. # Broker B catches up to A, so A attempts to send an AlterPartition request to the controller to add B into the ISR. # Before the AlterPartition request is received, replica B has a hard failure. # The current controller successfully fences broker B. It takes no action on this partition since B is already out of the ISR. # Before the controller receives the AlterPartition request to add B, it also fails. # While the new controller is initializing, suppose that replica B finishes startup, but the disk has been replaced (all of the previous state has been lost). # The new controller sees the registration from broker B first. # Finally, the AlterPartition from A arrives which adds B back into the ISR even though it has an empty log. (Credit for coming up with this scenario goes to [~junrao] .) I tested this in KRaft and confirmed that this sequence is possible (even if perhaps unlikely). There are a few ways we could have potentially detected the issue. First, perhaps the leader should have bumped the leader epoch on all partitions when B was fenced. Then the inflight AlterPartition would be doomed no matter when it arrived. Alternatively, we could have relied on the broker epoch to distinguish the dead broker's state from that of the restarted broker. This could be done by including the broker epoch in both the `Fetch` request and in `AlterPartition`. Finally, perhaps even normal kafka replication should be using a unique identifier for each disk so that we can reliably detect when it has changed. For example, something like what was proposed for the metadata quorum here: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. was: We have been thinking about disk failure cases recently. Suppose that a disk has failed and the user needs to restart the disk from an empty state. The concern is whether this can lead to the unnecessary loss of committed data. For normal topic partitions, removal from the ISR during controlled shutdown buys us some protection. After the replica is restarted, it must prove its state to the leader before it can be added back to the ISR. And it cannot become a leader until it does so. An obvious exception to this is when the replica is the last member in the ISR. In this case, the disk failure itself has compromised the committed data, so some amount of loss must be expected. We have been considering other scenarios in which the loss of one disk can lead to data loss even when there are replicas remaining which have all of the committed entries. One such scenario is this: Suppose we have a partition with two replicas: A and B. Initially A is the leader and it is the only member of the ISR. # Broker B catches up to A, so A attempts to send an AlterPartition request to the controller to add B into the ISR. # Before the AlterPartition request is received, replica B has a hard failure. # The current controller successfully fences broker B. It takes no action on this partition since B is already out of the ISR. # Before the controller receives the AlterPartition request to add B, it also fails. # While the new controller is initializing, suppose that replica B finishes startup, but the disk has been replaced (all of the previous state has been lost). # The new controller sees the registration from broker B first. # Finally, the AlterPartition from A arrives which adds B back into the ISR even though it has an empty log. (Credit for coming up with this scenario goes to [~junrao] .) I tested this in KRaft and confirmed that this sequence is possible (even if perhaps unlikely). There are a few ways we could have potentially detected the issue. First, perhaps the leader should have bump
[jira] [Commented] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682063#comment-17682063 ] Alexandre Dupriez commented on KAFKA-14139: --- Could reproduce without ZK controller change. > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Alexandre Dupriez >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes.] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-8217) MockConsumer.poll executes pollTasks before checking wakeup flag
[ https://issues.apache.org/jira/browse/KAFKA-8217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682057#comment-17682057 ] Karsten Spang commented on KAFKA-8217: -- I think this is as it should be. I would want to call wakeup() from a poll task, so I knew when it got delivered. > MockConsumer.poll executes pollTasks before checking wakeup flag > > > Key: KAFKA-8217 > URL: https://issues.apache.org/jira/browse/KAFKA-8217 > Project: Kafka > Issue Type: Bug > Components: unit tests >Affects Versions: 2.0.1 >Reporter: Kevin >Priority: Minor > > The tasks scheduled in MockConsumer.schedulePollTask seem to be for > simulating interactions between the Consumer and the brokers, but > MockConsumer.poll runs scheduledPollTasks before it checks the wakeup flag. > Given that the KafkaConsumer.poll method checks the wakeup flag before doing > any other logic, it seems like the MockConsumer should rather check wakeup > before running scheduledPollTasks. > This makes it difficult to control exactly how many Consumer.poll invocations > occur in a unit test when using the wakeup() pattern described in > "Multithreaded Processing" in the KafkaConsumer docs, as gating each poll so > that it returns only when instructed in the test requires submitting a > scheduledPollTask that blocks until the test unblocks it. The trouble occurs > when trying to shut down the consumer, as the poll() task needs to be > unblocked in order to receive the WakeupException but > https://issues.apache.org/jira/browse/KAFKA-8216 means that Consumer.poll() > can be called many times in the race condition interval after poll has been > unblocked but before the test has called Consumer.wakeup(). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)
[ https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andy Coates updated KAFKA-14660: Description: Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR and, because the PR was never merged, is now reporting it as a security vulnerability in the latest Kafka Streams library. See: * [Vulnerability: sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] * [Original PR]([https://github.com/apache/kafka/pull/7414]) While it looks from the comments made by [~mjsax] and [~bbejeck] that the divide-by-zero is not really an issue, the fact that its now being reported as a vulnerability is, especially with regulators. PITA, but we should consider either getting this vulnerability removed (Google wasn't very helpful in providing info on how to do this), or fixed (Again, not sure how to tag the fix as fixing this issue). One option may just be to reopen the PR and merge (and then fix forward by switching it to throw an exception). was: Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR and, because the PR was never merged, is now reporting a it as a security vulnerability in the latest Kafka Streams library. See: * [Vulnerability: sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] * [Original PR](https://github.com/apache/kafka/pull/7414) While it looks from the comments made by [~mjsax] and [~bbejeck] that the divide-by-zero is not really an issue, the fact that its now being reported as a vulnerability is, especially with regulators. PITA, but we should consider either getting this vulnerability removed (Google wasn't very helpful in providing info on how to do this), or fixed (Again, not sure how to tag the fix as fixing this issue). One option may just be to reopen the PR and merge (and then fix forward by switching it to throw an exception). > Divide by zero security vulnerability (sonatype-2019-0422) > -- > > Key: KAFKA-14660 > URL: https://issues.apache.org/jira/browse/KAFKA-14660 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Andy Coates >Priority: Minor > > Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR > and, because the PR was never merged, is now reporting it as a security > vulnerability in the latest Kafka Streams library. > > See: > * [Vulnerability: > sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] > * [Original PR]([https://github.com/apache/kafka/pull/7414]) > > While it looks from the comments made by [~mjsax] and [~bbejeck] that the > divide-by-zero is not really an issue, the fact that its now being reported > as a vulnerability is, especially with regulators. > PITA, but we should consider either getting this vulnerability removed > (Google wasn't very helpful in providing info on how to do this), or fixed > (Again, not sure how to tag the fix as fixing this issue). One option may > just be to reopen the PR and merge (and then fix forward by switching it to > throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14660) Divide by zero security vulnerability
Andy Coates created KAFKA-14660: --- Summary: Divide by zero security vulnerability Key: KAFKA-14660 URL: https://issues.apache.org/jira/browse/KAFKA-14660 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.3.2 Reporter: Andy Coates Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR and, because the PR was never merged, is now reporting a it as a security vulnerability in the latest Kafka Streams library. See: * [Vulnerability: sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] * [Original PR](https://github.com/apache/kafka/pull/7414) While it looks from the comments made by [~mjsax] and [~bbejeck] that the divide-by-zero is not really an issue, the fact that its now being reported as a vulnerability is, especially with regulators. PITA, but we should consider either getting this vulnerability removed (Google wasn't very helpful in providing info on how to do this), or fixed (Again, not sure how to tag the fix as fixing this issue). One option may just be to reopen the PR and merge (and then fix forward by switching it to throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)
[ https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andy Coates updated KAFKA-14660: Summary: Divide by zero security vulnerability (sonatype-2019-0422) (was: Divide by zero security vulnerability) > Divide by zero security vulnerability (sonatype-2019-0422) > -- > > Key: KAFKA-14660 > URL: https://issues.apache.org/jira/browse/KAFKA-14660 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Andy Coates >Priority: Minor > > Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR > and, because the PR was never merged, is now reporting a it as a security > vulnerability in the latest Kafka Streams library. > > See: > * [Vulnerability: > sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] > * [Original PR](https://github.com/apache/kafka/pull/7414) > > While it looks from the comments made by [~mjsax] and [~bbejeck] that the > divide-by-zero is not really an issue, the fact that its now being reported > as a vulnerability is, especially with regulators. > PITA, but we should consider either getting this vulnerability removed > (Google wasn't very helpful in providing info on how to do this), or fixed > (Again, not sure how to tag the fix as fixing this issue). One option may > just be to reopen the PR and merge (and then fix forward by switching it to > throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
vamossagar12 commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1408469183 > I simply meant that it looks like some changes in the system tests are required too. Got it. Thanks for the confirmation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14592) Move FeatureCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gantigmaa Selenge reassigned KAFKA-14592: - Assignee: Gantigmaa Selenge > Move FeatureCommand to tools > > > Key: KAFKA-14592 > URL: https://issues.apache.org/jira/browse/KAFKA-14592 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] OmniaGM commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
OmniaGM commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1090471930 ## tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java: ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.ToolsUtils; + +import java.io.File; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.Arrays; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Scanner; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days). + * + * This utility expects at least one of the following two arguments - + * 1. A list of state change log files + * 2. A regex to specify state change log file names. + * + * This utility optionally also accepts the following arguments - + * 1. The topic whose state change logs should be merged + * 2. A list of partitions whose state change logs should be merged (can be specified only when the topic argument + * is explicitly specified) + * 3. Start time from when the logs should be merged + * 4. End time until when the logs should be merged + */ +public class StateChangeLogMerger { +private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS"; +private static final Pattern DATE_PATTERN = Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}"); +private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + Topic.LEGAL_CHARS + "+),([0-9]+)\\]"); + +private static List files; +private static String topic; +private static List partitions; +private static Date startDate; +private static Date endDate; + +public static void main(String[] args) { Review Comment: It may be better to unify the cli tool classes to use same pattern as `ClusterTools` and `MetadataQuorumCommand`to have `main`, `mainNoExit`, and `execute`. Example is here https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java#L49 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
OmniaGM commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1090495143 ## tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java: ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.ToolsUtils; + +import java.io.File; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.Arrays; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Scanner; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days). + * + * This utility expects at least one of the following two arguments - + * 1. A list of state change log files + * 2. A regex to specify state change log file names. + * + * This utility optionally also accepts the following arguments - + * 1. The topic whose state change logs should be merged + * 2. A list of partitions whose state change logs should be merged (can be specified only when the topic argument + * is explicitly specified) + * 3. Start time from when the logs should be merged + * 4. End time until when the logs should be merged + */ +public class StateChangeLogMerger { +private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS"; +private static final Pattern DATE_PATTERN = Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}"); +private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + Topic.LEGAL_CHARS + "+),([0-9]+)\\]"); + +private static List files; +private static String topic; +private static List partitions; +private static Date startDate; +private static Date endDate; + +public static void main(String[] args) { +try { +StateChangeLogMergerOptions options = new StateChangeLogMergerOptions(args); +if (CommandLineUtils.isPrintHelpNeeded(options)) { +CommandLineUtils.printUsageAndExit(options.parser, +"A tool for merging the log files from several brokers to reconstruct a unified history of what happened."); +return; +} +if (CommandLineUtils.isPrintVersionNeeded(options)) { +CommandLineUtils.printVersionAndExit(); +return; +} + +if ((!options.hasFiles() && !options.hasRegex()) || (options.hasFiles() && options.hasRegex())) { Review Comment: I know the original code use this pattern, however, can't we use `CommandLineUtils.checkInvalidArgs` here instead? ## tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java: ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specifi
[jira] [Assigned] (KAFKA-14581) Move GetOffsetShell to tools
[ https://issues.apache.org/jira/browse/KAFKA-14581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gantigmaa Selenge reassigned KAFKA-14581: - Assignee: Gantigmaa Selenge > Move GetOffsetShell to tools > > > Key: KAFKA-14581 > URL: https://issues.apache.org/jira/browse/KAFKA-14581 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tinaselenge opened a new pull request, #13172: KAFKA-14590: Move DelegationTokenCommand to tools
tinaselenge opened a new pull request, #13172: URL: https://github.com/apache/kafka/pull/13172 Support delegationToken APIs in MockAdminClient. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
mimaison commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1408441506 I simply meant that it looks like some changes in the system tests are required too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org