[GitHub] [kafka] monish-byte commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced
monish-byte commented on PR #14156: URL: https://github.com/apache/kafka/pull/14156#issuecomment-186364 hey @gharris1727 I have joined the mailing list. I am eager to know what should I do next. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced
gharris1727 commented on PR #14156: URL: https://github.com/apache/kafka/pull/14156#issuecomment-1666576481 Hi @monish-byte ! Thanks for considering contributing to Apache Kafka. You can find the contributing guide here: https://kafka.apache.org/contributing.html It is a good idea to join the dev mailing list: https://kafka.apache.org/contact.html and ask for access to the JIRA and Confluence. From there, you can ask more questions about contributing, submit your own issues, and work on issues that have already been reported. I look forward to seeing you on the mailing list! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erikvanoosten commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer
erikvanoosten commented on PR #13914: URL: https://github.com/apache/kafka/pull/13914#issuecomment-1666574852 Withdrawn because the committers do not seem to be convinced that you cannot control on what thread code runs with an async runtime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erikvanoosten closed pull request #13914: KAFKA-14972: Support async runtimes in consumer
erikvanoosten closed pull request #13914: KAFKA-14972: Support async runtimes in consumer URL: https://github.com/apache/kafka/pull/13914 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erikvanoosten commented on pull request #14071: Make acquire and release protected
erikvanoosten commented on PR #14071: URL: https://github.com/apache/kafka/pull/14071#issuecomment-1666574726 Withdrawn because the committers do not seem to be convinced that you cannot control on what thread code runs with an async runtime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erikvanoosten closed pull request #14071: Make acquire and release protected
erikvanoosten closed pull request #14071: Make acquire and release protected URL: https://github.com/apache/kafka/pull/14071 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten resolved KAFKA-14972. - Resolution: Won't Fix > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751330#comment-17751330 ] Erik van Oosten commented on KAFKA-14972: - I am closing this task as won't fix as the committers do not seem to be convinced it is needed to support async run times. > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten reassigned KAFKA-14972: --- Assignee: (was: Erik van Oosten) > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751323#comment-17751323 ] Jinyong Choi commented on KAFKA-15302: -- Hi Matthias J. Sax, Reading your comment helped me to be more specific about this bug. For instance, when using this.context.forward(msg) to forward a message, to optimize storage efficiency, the key of the forwarded message can also be deleted from the store. So, If we call store.delete(key);, the delete() function of CachingKeyValueStore could invoke the getInternal() and putInternal() functions, and following these function calls could lead to the execution of maybeEvict(). Deleting by entering a null value is actually a valid approach to removing items from RocksDB. Therefore, the observed behavior is normal. So, I'm currently writing code to suppress the MaybeEvict() operation. When the test is complete, i will share the results. Let me know if you have any additional comments! {code:java} # CachingKeyValueStore @Override public byte[] delete(final Bytes key) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); lock.writeLock().lock(); try { validateStoreOpen(); return deleteInternal(key); } finally { lock.writeLock().unlock(); } } private byte[] deleteInternal(final Bytes key) { final byte[] v = getInternal(key); putInternal(key, null); return v; } private void putInternal(final Bytes key, final byte[] value) { context.cache().put( cacheName, key, new LRUCacheEntry( value, context.headers(), true, context.offset(), context.timestamp(), context.partition(), context.topic())); StoreQueryUtils.updatePosition(position, context); } # NamedCache public void put(final String namespace, final Bytes key, final LRUCacheEntry value, final boolean needToEvict) { numPuts++; final NamedCache cache = getOrCreateCache(namespace); synchronized (cache) { final long oldSize = cache.sizeInBytes(); cache.put(key, value); sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize); maybeEvict(namespace, cache); } } {code} > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting
[GitHub] [kafka] omkreddy commented on a diff in pull request #14130: KAFKA-15273: Log common name of expired client certificates
omkreddy commented on code in PR #14130: URL: https://github.com/apache/kafka/pull/14130#discussion_r1285082984 ## clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java: ## @@ -255,7 +271,7 @@ private SSLContext createSSLContext(SecurityStore keystore, SecurityStore trusts } String tmfAlgorithm = this.tmfAlgorithm != null ? this.tmfAlgorithm : TrustManagerFactory.getDefaultAlgorithm(); -TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm); +CommonNameLoggingTrustManagerFactoryWrapper tmf = CommonNameLoggingTrustManagerFactoryWrapper.getInstance(tmfAlgorithm); Review Comment: My main concern is related to possible performance impact of the additional processing. We often see that faulty clients bombarding the brokers with handshake requests with expired certificates. This can impact broker performance. So I was thinking of adding a flag to enable/disable. Lets see what others thinking -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ethaden commented on a diff in pull request #14130: KAFKA-15273: Log common name of expired client certificates
ethaden commented on code in PR #14130: URL: https://github.com/apache/kafka/pull/14130#discussion_r1285078073 ## clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java: ## @@ -578,4 +594,335 @@ private List pemEntries(String pem) { return entries; } } + +/** + * A wrapper around the original trust manager factory for creating common name logging trust managers. + * These trust managers log the common name of an expired but otherwise valid (client) certificate before rejecting the connection attempt. + * This allows to identify misconfigured clients in complex network environments, where the IP address is not sufficient. + */ +static class CommonNameLoggingTrustManagerFactoryWrapper { Review Comment: @omkreddy I moved the new classes and the corresponding test cases into separate files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10334) Transactions not working properly
[ https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-10334. --- Resolution: Duplicate > Transactions not working properly > - > > Key: KAFKA-10334 > URL: https://issues.apache.org/jira/browse/KAFKA-10334 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Luis Araujo >Priority: Major > > I'm using transactions provided by Kafka Producer API in a Scala project > built with SBT. The dependency used in the project is: > {code:java} > "org.apache.kafka" % "kafka-clients" % "2.1.0" {code} > I followed the documentation and I was expecting that transactions fail when > I call *.commitTransaction* if some problem is raised when sending a message > like it's described in the > [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]. > Unfortunately, when testing this behaviour using a message larger than the > size accepted by the Kafka broker/cluster, the transactions are not working > properly. > I tested with a 3 Kafka broker cluster with 1MB message max size (default > value): > - when the message has 1MB, the transaction is aborted and an exception is > raised when calling *commitTransaction()* > - when the message is bigger than 1MB, the transaction is completed > successfully *without* the message being written. No exception is thrown. > As an example, this means that when I produce 9 messages with 1 KB and 1 > message with 1.1MB in the same transaction, the transaction is completed but > only 9 messages are written to the Kafka cluster. > I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka > cluster and Kafka Producer API. > The configs that I'm using to create the KafkaProducer in order to use > transactions: > {code:java} > new Properties() { > { > put(BOOTSTRAP_SERVERS_CONFIG, > "localhost:29092,localhost:29093,localhost:29094") > put(ACKS_CONFIG, "-1") > put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") > put(KEY_SERIALIZER_CLASS_CONFIG, > Class.forName(classOf[StringSerializer].getName)) > put(VALUE_SERIALIZER_CLASS_CONFIG, > Class.forName(classOf[ByteArraySerializer].getName)) > put(CLIENT_ID_CONFIG, "app") > put(TRANSACTIONAL_ID_CONFIG, "app") > put(ENABLE_IDEMPOTENCE_CONFIG, "true") > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors
[ https://issues.apache.org/jira/browse/KAFKA-15310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751309#comment-17751309 ] Romulo Souza edited comment on KAFKA-15310 at 8/5/23 1:56 PM: -- I intend to work on this issue after resolve some assignee permission issues. was (Author: JIRAUSER301070): I intend to work on this issue. > Add timezone configuration option in TimestampConverter from connectors > --- > > Key: KAFKA-15310 > URL: https://issues.apache.org/jira/browse/KAFKA-15310 > Project: Kafka > Issue Type: New Feature > Components: config, connect >Reporter: Romulo Souza >Priority: Minor > Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de > tela de 2023-08-05 09-44-25-1.png > > > In some cenarios where the use of TimestampConverter happens, it's > interesting to have an option to determine a specific timezone other than UTC > (hardcoded). E.g., there are use cases where a sink connector sends data to a > database and this same data is used in analysis tool without formatting and > transformation options. > It should be added a new Kafka Connector's optional configuration to set the > desired timezone with a fallback to UTC when not informed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] monish-byte commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced
monish-byte commented on PR #14156: URL: https://github.com/apache/kafka/pull/14156#issuecomment-1666502303 hey @gharris1727 , can you please guide me how can I start contributing to this project as I am new to open source contribution but I have a strong knowledge of Java. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors
[ https://issues.apache.org/jira/browse/KAFKA-15310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751309#comment-17751309 ] Romulo Souza commented on KAFKA-15310: -- I intend to work on this issue. > Add timezone configuration option in TimestampConverter from connectors > --- > > Key: KAFKA-15310 > URL: https://issues.apache.org/jira/browse/KAFKA-15310 > Project: Kafka > Issue Type: New Feature > Components: config, connect >Reporter: Romulo Souza >Priority: Minor > Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de > tela de 2023-08-05 09-44-25-1.png > > > In some cenarios where the use of TimestampConverter happens, it's > interesting to have an option to determine a specific timezone other than UTC > (hardcoded). E.g., there are use cases where a sink connector sends data to a > database and this same data is used in analysis tool without formatting and > transformation options. > It should be added a new Kafka Connector's optional configuration to set the > desired timezone with a fallback to UTC when not informed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors
[ https://issues.apache.org/jira/browse/KAFKA-15310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Romulo Souza updated KAFKA-15310: - Description: In some cenarios where the use of TimestampConverter happens, it's interesting to have an option to determine a specific timezone other than UTC (hardcoded). E.g., there are use cases where a sink connector sends data to a database and this same data is used in analysis tool without formatting and transformation options. It should be added a new Kafka Connector's optional configuration to set the desired timezone with a fallback to UTC when not informed. was: In some cenarios where the use of TimestampConverter happens, it's interesting to have an option to determine a specific timezone other than UTC (hardcoded). E.g., there are use cases where a sink connector sends data to a database and this same data is used in analysis tool without formatting and transformation options. It should be added a new Kafka Connector's optional configuration to set the desired timezone with a fallback to UTC when not informed. !Captura de tela de 2023-08-05 09-43-54.png! !Captura de tela de 2023-08-05 09-44-25.png! > Add timezone configuration option in TimestampConverter from connectors > --- > > Key: KAFKA-15310 > URL: https://issues.apache.org/jira/browse/KAFKA-15310 > Project: Kafka > Issue Type: New Feature > Components: config, connect >Reporter: Romulo Souza >Priority: Minor > Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de > tela de 2023-08-05 09-44-25-1.png > > > In some cenarios where the use of TimestampConverter happens, it's > interesting to have an option to determine a specific timezone other than UTC > (hardcoded). E.g., there are use cases where a sink connector sends data to a > database and this same data is used in analysis tool without formatting and > transformation options. > It should be added a new Kafka Connector's optional configuration to set the > desired timezone with a fallback to UTC when not informed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors
Romulo Souza created KAFKA-15310: Summary: Add timezone configuration option in TimestampConverter from connectors Key: KAFKA-15310 URL: https://issues.apache.org/jira/browse/KAFKA-15310 Project: Kafka Issue Type: New Feature Components: config, connect Reporter: Romulo Souza Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de tela de 2023-08-05 09-44-25-1.png In some cenarios where the use of TimestampConverter happens, it's interesting to have an option to determine a specific timezone other than UTC (hardcoded). E.g., there are use cases where a sink connector sends data to a database and this same data is used in analysis tool without formatting and transformation options. It should be added a new Kafka Connector's optional configuration to set the desired timezone with a fallback to UTC when not informed. !Captura de tela de 2023-08-05 09-43-54.png! !Captura de tela de 2023-08-05 09-44-25.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
ruslankrivoshein commented on code in PR #13562: URL: https://github.com/apache/kafka/pull/13562#discussion_r1285035259 ## tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java: ## @@ -99,4 +101,26 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + +public static void validateBootstrapServer(String hostPort) throws IllegalArgumentException { +if (hostPort == null || hostPort.isEmpty()) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15050) Prompts in the quickstarts
[ https://issues.apache.org/jira/browse/KAFKA-15050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751303#comment-17751303 ] Joobi S B commented on KAFKA-15050: --- Hi [~tombentley] could you please look at this PR, I've updated the comments https://github.com/apache/kafka/pull/13862 > Prompts in the quickstarts > -- > > Key: KAFKA-15050 > URL: https://issues.apache.org/jira/browse/KAFKA-15050 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Tom Bentley >Assignee: Joobi S B >Priority: Trivial > Labels: newbie > > In the quickstarts [Steps > 1-5|https://kafka.apache.org/documentation/#quickstart] use {{$}} to indicate > the command prompt. When we start to use Kafka Connect in [Step > 6|https://kafka.apache.org/documentation/#quickstart_kafkaconnect] we switch > to {{{}>{}}}. The [Kafka Streams > quickstart|https://kafka.apache.org/documentation/streams/quickstart] also > uses {{{}>{}}}. I don't think there's a reason for this, but if there is one > (root vs user account?) it should be explained. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15203) Remove dependency on Reflections
[ https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751302#comment-17751302 ] Joobi S B commented on KAFKA-15203: --- Hi [~divijvaidya] , I would like to work on this, could you please let me know how can I start on this > Remove dependency on Reflections > - > > Key: KAFKA-15203 > URL: https://issues.apache.org/jira/browse/KAFKA-15203 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Divij Vaidya >Priority: Major > Labels: newbie > > We currently depend on reflections library which is EOL. Quoting from the > GitHub site: > _> Please note: Reflections library is currently NOT under active development > or maintenance_ > > This poses a supply chain risk for our project where the security fixes and > other major bugs in underlying dependency may not be addressed timely. > Hence, we should plan to remove this dependency. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on a diff in pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix
kamalcph commented on code in PR #14151: URL: https://github.com/apache/kafka/pull/14151#discussion_r1285019147 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -134,6 +134,8 @@ public final class RemoteLogManagerConfig { "less than or equal to `log.retention.bytes` value."; public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; +public static final String REMOTE_LOG_METADATA_PREFIX = "remote.log.metadata"; Review Comment: We already have `REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP`. Configs that are prefixed with this value will be supplied to remote log metadata manager. To configure the values in the JIRA ticket: ``` remote.log.storage.manager.impl.prefix=remote.log.storage. remote.log.metadata.manager.impl.prefix=rlmm.config. rlmm.config.remote.log.metadata.topic.num.partitions=50 rlmm.config.remote.log.metadata.topic.replication.factor=4 rlmm.config.remote.log.metadata.topic.retention.ms=259200 remote.log.storage.s3... ``` Let's avoid one more `Map` (remoteLogMetadataProps) in RemoteLogManagerConfig. Also, we can define the default values for the config `prefix` in this PR: ``` public static final String DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX = "rsm.config."; public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config."; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joobisb commented on pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock
joobisb commented on PR #13931: URL: https://github.com/apache/kafka/pull/13931#issuecomment-1666466685 Hi @cadonna , As per the comment, I've setup mocks and verified the calls on it, also addressed other comments as well, please have a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
kamalcph commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1285011540 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); -remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); +Optional customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), -RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); +customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + +int customMetadataSizeLimit = RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize(); +if (customMetadata.isPresent()) { +long customMetadataSize = customMetadata.get().value().length; +if (customMetadataSize > customMetadataSizeLimit) { +CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException(); +logger.error("Custom metadata size {} exceeds configured limit {}." + +" Copying will be stopped and copied segment will be attempted to clean." + +" Original metadata: {}", +customMetadataSize, customMetadataSizeLimit, copySegmentStartedRlsm, e); +try { +// For deletion, we provide back the custom metadata by creating a new metadata object from the update. +// However, the update itself will not be stored in this case. + remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm)); +} catch (RemoteStorageException e1) { +logger.error("Error while cleaning segment after custom metadata size exceeded", e1); Review Comment: The approach taken looks good. Even if we fail to delete the last uploaded segment on error, it will be marked as unreferenced segment. And, when the RLM task is enabled for the topic, it will be removed in the regular segment cleanup cycle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
kamalcph commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1285011540 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); -remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); +Optional customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), -RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); +customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + +int customMetadataSizeLimit = RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize(); +if (customMetadata.isPresent()) { +long customMetadataSize = customMetadata.get().value().length; +if (customMetadataSize > customMetadataSizeLimit) { +CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException(); +logger.error("Custom metadata size {} exceeds configured limit {}." + +" Copying will be stopped and copied segment will be attempted to clean." + +" Original metadata: {}", +customMetadataSize, customMetadataSizeLimit, copySegmentStartedRlsm, e); +try { +// For deletion, we provide back the custom metadata by creating a new metadata object from the update. +// However, the update itself will not be stored in this case. + remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm)); +} catch (RemoteStorageException e1) { +logger.error("Error while cleaning segment after custom metadata size exceeded", e1); Review Comment: The approach taken looks good. Even if we fail to delete the last uploaded segment on error, it's will marked as unreferenced segment. And, when the RLM task is enabled for the topic, it will be removed in the regular segment cleanup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
fvaleri commented on code in PR #13562: URL: https://github.com/apache/kafka/pull/13562#discussion_r1285005842 ## tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java: ## @@ -99,4 +101,26 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + +public static void validateBootstrapServer(String hostPort) throws IllegalArgumentException { +if (hostPort == null || hostPort.isEmpty()) { Review Comment: Since Java 11 we can use `isBlank()` that also ignores white spaces. Unfortunately, we still need to provide support for Java 8, where we can achieve the same by doing `hostPort.trim().isEmpty()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
fvaleri commented on code in PR #13562: URL: https://github.com/apache/kafka/pull/13562#discussion_r1285005842 ## tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java: ## @@ -99,4 +101,26 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + +public static void validateBootstrapServer(String hostPort) throws IllegalArgumentException { +if (hostPort == null || hostPort.isEmpty()) { Review Comment: Since Java 11 we can use `isBlank()` that also ignores white spaces. Unfortunately, we still need to provide compatibility with Java 8, where we can achieve the same by doing `hostPort.trim().isEmpty()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service
fvaleri commented on PR #14092: URL: https://github.com/apache/kafka/pull/14092#issuecomment-1666442787 > That does appear to work, because [server-common is not being excluded from tools-dependant-libs](https://github.com/apache/kafka/blob/b3db905b27ff4133f4018ac922c9ce2beb2d6087/build.gradle#L1895-L1897) like the clients is. Exactly. > If we choose not to change this compatibility boundary from between server-common + clients to between tools + server-common, then I think the fix you propose is possible. I think we can improve further as you say, but probably it's better to do it in a separate PR, starting from working code. > Can you revert the changes in producer_performance.py? they don't seem to have an effect anymore. Done. Thanks again for the help, appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
showuon commented on PR #14116: URL: https://github.com/apache/kafka/pull/14116#issuecomment-1666411942 Please let me know when ready for review. Thanks for the work! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix
kamalcph commented on PR #14151: URL: https://github.com/apache/kafka/pull/14151#issuecomment-1666405256 Not clear on this patch, will go another round of review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org