[GitHub] [kafka] monish-byte commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-05 Thread via GitHub


monish-byte commented on PR #14156:
URL: https://github.com/apache/kafka/pull/14156#issuecomment-186364

   hey @gharris1727 I have joined the mailing list. I am eager to know what 
should I do next.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-05 Thread via GitHub


gharris1727 commented on PR #14156:
URL: https://github.com/apache/kafka/pull/14156#issuecomment-1666576481

   Hi @monish-byte ! Thanks for considering contributing to Apache Kafka.
   
   You can find the contributing guide here: 
https://kafka.apache.org/contributing.html
   It is a good idea to join the dev mailing list: 
https://kafka.apache.org/contact.html and ask for access to the JIRA and 
Confluence. From there, you can ask more questions about contributing, submit 
your own issues, and work on issues that have already been reported.
   
   I look forward to seeing you on the mailing list!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erikvanoosten commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer

2023-08-05 Thread via GitHub


erikvanoosten commented on PR #13914:
URL: https://github.com/apache/kafka/pull/13914#issuecomment-1666574852

   Withdrawn because the committers do not seem to be convinced that you cannot 
control on what thread code runs with an async runtime.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erikvanoosten closed pull request #13914: KAFKA-14972: Support async runtimes in consumer

2023-08-05 Thread via GitHub


erikvanoosten closed pull request #13914: KAFKA-14972: Support async runtimes 
in consumer
URL: https://github.com/apache/kafka/pull/13914


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erikvanoosten commented on pull request #14071: Make acquire and release protected

2023-08-05 Thread via GitHub


erikvanoosten commented on PR #14071:
URL: https://github.com/apache/kafka/pull/14071#issuecomment-1666574726

   Withdrawn because the committers do not seem to be convinced that you cannot 
control on what thread code runs with an async runtime.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erikvanoosten closed pull request #14071: Make acquire and release protected

2023-08-05 Thread via GitHub


erikvanoosten closed pull request #14071: Make acquire and release protected
URL: https://github.com/apache/kafka/pull/14071


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-08-05 Thread Erik van Oosten (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten resolved KAFKA-14972.
-
Resolution: Won't Fix

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> It should be possible for a thread to pass on its capability to access the 
> consumer to another thread. See 
> [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and 
> [https://github.com/apache/kafka/pull/13914] for an implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-08-05 Thread Erik van Oosten (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751330#comment-17751330
 ] 

Erik van Oosten commented on KAFKA-14972:
-

I am closing this task as won't fix as the committers do not seem to be 
convinced it is needed to support async run times.

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> It should be possible for a thread to pass on its capability to access the 
> consumer to another thread. See 
> [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and 
> [https://github.com/apache/kafka/pull/13914] for an implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-08-05 Thread Erik van Oosten (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten reassigned KAFKA-14972:
---

Assignee: (was: Erik van Oosten)

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> It should be possible for a thread to pass on its capability to access the 
> consumer to another thread. See 
> [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and 
> [https://github.com/apache/kafka/pull/13914] for an implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-05 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751323#comment-17751323
 ] 

Jinyong Choi commented on KAFKA-15302:
--

Hi Matthias J. Sax,

Reading your comment helped me to be more specific about this bug.

For instance, when using this.context.forward(msg) to forward a message, to 
optimize storage efficiency,
the key of the forwarded message can also be deleted from the store.
So, If we call store.delete(key);, the delete() function of 
CachingKeyValueStore could invoke the getInternal() and putInternal() 
functions, and following these function calls could lead to the execution of 
maybeEvict().

Deleting by entering a null value is actually a valid approach to removing 
items from RocksDB.
Therefore, the observed behavior is normal.

So, I'm currently writing code to suppress the MaybeEvict() operation.
When the test is complete, i will share the results.

Let me know if you have any additional comments!

 
{code:java}
# CachingKeyValueStore
@Override
public byte[] delete(final Bytes key) {
    Objects.requireNonNull(key, "key cannot be null");
    validateStoreOpen();
    lock.writeLock().lock();
    try {
        validateStoreOpen();
        return deleteInternal(key);
    } finally {
        lock.writeLock().unlock();
    }
}

private byte[] deleteInternal(final Bytes key) {
    final byte[] v = getInternal(key);
    putInternal(key, null);
    return v;
}

private void putInternal(final Bytes key,
                         final byte[] value) {
    context.cache().put(
        cacheName,
        key,
        new LRUCacheEntry(
            value,
            context.headers(),
            true,
            context.offset(),
            context.timestamp(),
            context.partition(),
            context.topic()));    
StoreQueryUtils.updatePosition(position, context);
}

# NamedCache
public void put(final String namespace, final Bytes key, final LRUCacheEntry 
value, final boolean needToEvict) {
    numPuts++;    
final NamedCache cache = getOrCreateCache(namespace);    
synchronized (cache) {
        final long oldSize = cache.sizeInBytes();
        cache.put(key, value);
        sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
        maybeEvict(namespace, cache);
    }
} {code}
 

 

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting 

[GitHub] [kafka] omkreddy commented on a diff in pull request #14130: KAFKA-15273: Log common name of expired client certificates

2023-08-05 Thread via GitHub


omkreddy commented on code in PR #14130:
URL: https://github.com/apache/kafka/pull/14130#discussion_r1285082984


##
clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java:
##
@@ -255,7 +271,7 @@ private SSLContext createSSLContext(SecurityStore keystore, 
SecurityStore trusts
 }
 
 String tmfAlgorithm = this.tmfAlgorithm != null ? 
this.tmfAlgorithm : TrustManagerFactory.getDefaultAlgorithm();
-TrustManagerFactory tmf = 
TrustManagerFactory.getInstance(tmfAlgorithm);
+CommonNameLoggingTrustManagerFactoryWrapper tmf = 
CommonNameLoggingTrustManagerFactoryWrapper.getInstance(tmfAlgorithm);

Review Comment:
   My main concern is related to possible performance impact of the additional 
processing. We often see that faulty clients bombarding the brokers with 
handshake requests with expired certificates. This can impact broker 
performance. So I was thinking of adding a flag to enable/disable. Lets see 
what others thinking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ethaden commented on a diff in pull request #14130: KAFKA-15273: Log common name of expired client certificates

2023-08-05 Thread via GitHub


ethaden commented on code in PR #14130:
URL: https://github.com/apache/kafka/pull/14130#discussion_r1285078073


##
clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java:
##
@@ -578,4 +594,335 @@ private List pemEntries(String pem) {
 return entries;
 }
 }
+
+/**
+ * A wrapper around the original trust manager factory for creating common 
name logging trust managers.
+ * These trust managers log the common name of an expired but otherwise 
valid (client) certificate before rejecting the connection attempt.
+ * This allows to identify misconfigured clients in complex network 
environments, where the IP address is not sufficient.
+ */
+static class CommonNameLoggingTrustManagerFactoryWrapper {

Review Comment:
   @omkreddy I moved the new classes and the corresponding test cases into 
separate files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-10334) Transactions not working properly

2023-08-05 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-10334.
---
Resolution: Duplicate

> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.
> The configs that I'm using to create the KafkaProducer in order to use 
> transactions:
> {code:java}
> new Properties() {
>   {
> put(BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:29092,localhost:29093,localhost:29094")
> put(ACKS_CONFIG, "-1")
> put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
> put(KEY_SERIALIZER_CLASS_CONFIG, 
> Class.forName(classOf[StringSerializer].getName))
> put(VALUE_SERIALIZER_CLASS_CONFIG, 
> Class.forName(classOf[ByteArraySerializer].getName))
> put(CLIENT_ID_CONFIG, "app")
> put(TRANSACTIONAL_ID_CONFIG, "app")
> put(ENABLE_IDEMPOTENCE_CONFIG, "true")
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors

2023-08-05 Thread Romulo Souza (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751309#comment-17751309
 ] 

Romulo Souza edited comment on KAFKA-15310 at 8/5/23 1:56 PM:
--

I intend to work on this issue after resolve some assignee permission issues.


was (Author: JIRAUSER301070):
I intend to work on this issue.

> Add timezone configuration option in TimestampConverter from connectors
> ---
>
> Key: KAFKA-15310
> URL: https://issues.apache.org/jira/browse/KAFKA-15310
> Project: Kafka
>  Issue Type: New Feature
>  Components: config, connect
>Reporter: Romulo Souza
>Priority: Minor
> Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de 
> tela de 2023-08-05 09-44-25-1.png
>
>
> In some cenarios where the use of TimestampConverter happens, it's 
> interesting to have an option to determine a specific timezone other than UTC 
> (hardcoded). E.g., there are use cases where a sink connector sends data to a 
> database and this same data is used in analysis tool without formatting and 
> transformation options.
> It should be added a new Kafka Connector's optional configuration to set the 
> desired timezone with a fallback to UTC when not informed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] monish-byte commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-05 Thread via GitHub


monish-byte commented on PR #14156:
URL: https://github.com/apache/kafka/pull/14156#issuecomment-1666502303

   hey @gharris1727 , can you please guide me how can I start contributing to 
this project as I am new to open source contribution but I have a strong 
knowledge of Java. Thank you.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors

2023-08-05 Thread Romulo Souza (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751309#comment-17751309
 ] 

Romulo Souza commented on KAFKA-15310:
--

I intend to work on this issue.

> Add timezone configuration option in TimestampConverter from connectors
> ---
>
> Key: KAFKA-15310
> URL: https://issues.apache.org/jira/browse/KAFKA-15310
> Project: Kafka
>  Issue Type: New Feature
>  Components: config, connect
>Reporter: Romulo Souza
>Priority: Minor
> Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de 
> tela de 2023-08-05 09-44-25-1.png
>
>
> In some cenarios where the use of TimestampConverter happens, it's 
> interesting to have an option to determine a specific timezone other than UTC 
> (hardcoded). E.g., there are use cases where a sink connector sends data to a 
> database and this same data is used in analysis tool without formatting and 
> transformation options.
> It should be added a new Kafka Connector's optional configuration to set the 
> desired timezone with a fallback to UTC when not informed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors

2023-08-05 Thread Romulo Souza (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Romulo Souza updated KAFKA-15310:
-
Description: 
In some cenarios where the use of TimestampConverter happens, it's interesting 
to have an option to determine a specific timezone other than UTC (hardcoded). 
E.g., there are use cases where a sink connector sends data to a database and 
this same data is used in analysis tool without formatting and transformation 
options.

It should be added a new Kafka Connector's optional configuration to set the 
desired timezone with a fallback to UTC when not informed.

  was:
In some cenarios where the use of TimestampConverter happens, it's interesting 
to have an option to determine a specific timezone other than UTC (hardcoded). 
E.g., there are use cases where a sink connector sends data to a database and 
this same data is used in analysis tool without formatting and transformation 
options.

It should be added a new Kafka Connector's optional configuration to set the 
desired timezone with a fallback to UTC when not informed.

!Captura de tela de 2023-08-05 09-43-54.png!

!Captura de tela de 2023-08-05 09-44-25.png!


> Add timezone configuration option in TimestampConverter from connectors
> ---
>
> Key: KAFKA-15310
> URL: https://issues.apache.org/jira/browse/KAFKA-15310
> Project: Kafka
>  Issue Type: New Feature
>  Components: config, connect
>Reporter: Romulo Souza
>Priority: Minor
> Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de 
> tela de 2023-08-05 09-44-25-1.png
>
>
> In some cenarios where the use of TimestampConverter happens, it's 
> interesting to have an option to determine a specific timezone other than UTC 
> (hardcoded). E.g., there are use cases where a sink connector sends data to a 
> database and this same data is used in analysis tool without formatting and 
> transformation options.
> It should be added a new Kafka Connector's optional configuration to set the 
> desired timezone with a fallback to UTC when not informed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors

2023-08-05 Thread Romulo Souza (Jira)
Romulo Souza created KAFKA-15310:


 Summary: Add timezone configuration option in TimestampConverter 
from connectors
 Key: KAFKA-15310
 URL: https://issues.apache.org/jira/browse/KAFKA-15310
 Project: Kafka
  Issue Type: New Feature
  Components: config, connect
Reporter: Romulo Souza
 Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de 
tela de 2023-08-05 09-44-25-1.png

In some cenarios where the use of TimestampConverter happens, it's interesting 
to have an option to determine a specific timezone other than UTC (hardcoded). 
E.g., there are use cases where a sink connector sends data to a database and 
this same data is used in analysis tool without formatting and transformation 
options.

It should be added a new Kafka Connector's optional configuration to set the 
desired timezone with a fallback to UTC when not informed.

!Captura de tela de 2023-08-05 09-43-54.png!

!Captura de tela de 2023-08-05 09-44-25.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-08-05 Thread via GitHub


ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1285035259


##
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##
@@ -99,4 +101,26 @@ public static void prettyPrintTable(
 printRow(columnLengths, headers, out);
 rows.forEach(row -> printRow(columnLengths, row, out));
 }
+
+public static void validateBootstrapServer(String hostPort) throws 
IllegalArgumentException {
+if (hostPort == null || hostPort.isEmpty()) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15050) Prompts in the quickstarts

2023-08-05 Thread Joobi S B (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751303#comment-17751303
 ] 

Joobi S B commented on KAFKA-15050:
---

Hi [~tombentley] 

could you please look at this PR, I've updated the comments
https://github.com/apache/kafka/pull/13862

> Prompts in the quickstarts
> --
>
> Key: KAFKA-15050
> URL: https://issues.apache.org/jira/browse/KAFKA-15050
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Tom Bentley
>Assignee: Joobi S B
>Priority: Trivial
>  Labels: newbie
>
> In the quickstarts [Steps 
> 1-5|https://kafka.apache.org/documentation/#quickstart] use {{$}} to indicate 
> the command prompt. When we start to use Kafka Connect in [Step 
> 6|https://kafka.apache.org/documentation/#quickstart_kafkaconnect] we switch 
> to {{{}>{}}}. The [Kafka Streams 
> quickstart|https://kafka.apache.org/documentation/streams/quickstart] also 
> uses {{{}>{}}}. I don't think there's a reason for this, but if there is one 
> (root vs user account?) it should be explained.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15203) Remove dependency on Reflections

2023-08-05 Thread Joobi S B (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751302#comment-17751302
 ] 

Joobi S B commented on KAFKA-15203:
---

Hi [~divijvaidya] ,

I would like to work on this, could you please let me know how can I start on 
this

> Remove dependency on Reflections 
> -
>
> Key: KAFKA-15203
> URL: https://issues.apache.org/jira/browse/KAFKA-15203
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: newbie
>
> We currently depend on reflections library which is EOL. Quoting from the 
> GitHub site:
> _> Please note: Reflections library is currently NOT under active development 
> or maintenance_
>  
> This poses a supply chain risk for our project where the security fixes and 
> other major bugs in underlying dependency may not be addressed timely.
> Hence, we should plan to remove this dependency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix

2023-08-05 Thread via GitHub


kamalcph commented on code in PR #14151:
URL: https://github.com/apache/kafka/pull/14151#discussion_r1285019147


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -134,6 +134,8 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String REMOTE_LOG_METADATA_PREFIX = 
"remote.log.metadata";

Review Comment:
   We already have `REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP`. Configs 
that are prefixed with this value will be supplied to remote log metadata 
manager. 
   
   To configure the values in the JIRA ticket:
   ```
   remote.log.storage.manager.impl.prefix=remote.log.storage.
   remote.log.metadata.manager.impl.prefix=rlmm.config.
   
   rlmm.config.remote.log.metadata.topic.num.partitions=50
   rlmm.config.remote.log.metadata.topic.replication.factor=4
   rlmm.config.remote.log.metadata.topic.retention.ms=259200
   
   remote.log.storage.s3...
   ```
   
   Let's avoid one more `Map` (remoteLogMetadataProps) in 
RemoteLogManagerConfig. 
   
   Also, we can define the default values for the config `prefix`  in this PR:
   ```
   public static final String DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX = 
"rsm.config.";
   public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX 
= "rlmm.config.";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] joobisb commented on pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

2023-08-05 Thread via GitHub


joobisb commented on PR #13931:
URL: https://github.com/apache/kafka/pull/13931#issuecomment-1666466685

   Hi @cadonna ,
   
   As per the comment, I've setup mocks and verified the calls on it, also 
addressed other comments as well, please have a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-05 Thread via GitHub


kamalcph commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1285011540


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 producerStateSnapshotFile.toPath(), leaderEpochsIndex);
 
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
 brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
-remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+Optional customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
 RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+int customMetadataSizeLimit = 
RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize();
+if (customMetadata.isPresent()) {
+long customMetadataSize = customMetadata.get().value().length;
+if (customMetadataSize > customMetadataSizeLimit) {
+CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+" Copying will be stopped and copied 
segment will be attempted to clean." +
+" Original metadata: {}",
+customMetadataSize, customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+try {
+// For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+// However, the update itself will not be stored in 
this case.
+
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
+} catch (RemoteStorageException e1) {
+logger.error("Error while cleaning segment after 
custom metadata size exceeded", e1);

Review Comment:
   The approach taken looks good. Even if we fail to delete the last uploaded 
segment on error, it will be marked as unreferenced segment. And, when the RLM 
task is enabled for the topic, it will be removed in the regular segment 
cleanup cycle. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-08-05 Thread via GitHub


kamalcph commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1285011540


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
 producerStateSnapshotFile.toPath(), leaderEpochsIndex);
 
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
 brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
-remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+Optional customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
 RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+int customMetadataSizeLimit = 
RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize();
+if (customMetadata.isPresent()) {
+long customMetadataSize = customMetadata.get().value().length;
+if (customMetadataSize > customMetadataSizeLimit) {
+CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+" Copying will be stopped and copied 
segment will be attempted to clean." +
+" Original metadata: {}",
+customMetadataSize, customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+try {
+// For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+// However, the update itself will not be stored in 
this case.
+
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
+} catch (RemoteStorageException e1) {
+logger.error("Error while cleaning segment after 
custom metadata size exceeded", e1);

Review Comment:
   The approach taken looks good. Even if we fail to delete the last uploaded 
segment on error, it's will marked as unreferenced segment. And, when the RLM 
task is enabled for the topic, it will be removed in the regular segment 
cleanup. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-08-05 Thread via GitHub


fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1285005842


##
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##
@@ -99,4 +101,26 @@ public static void prettyPrintTable(
 printRow(columnLengths, headers, out);
 rows.forEach(row -> printRow(columnLengths, row, out));
 }
+
+public static void validateBootstrapServer(String hostPort) throws 
IllegalArgumentException {
+if (hostPort == null || hostPort.isEmpty()) {

Review Comment:
   Since Java 11 we can use `isBlank()` that also ignores white spaces. 
Unfortunately, we still need to provide support for Java 8, where we can 
achieve the same by doing `hostPort.trim().isEmpty()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-08-05 Thread via GitHub


fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1285005842


##
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java:
##
@@ -99,4 +101,26 @@ public static void prettyPrintTable(
 printRow(columnLengths, headers, out);
 rows.forEach(row -> printRow(columnLengths, row, out));
 }
+
+public static void validateBootstrapServer(String hostPort) throws 
IllegalArgumentException {
+if (hostPort == null || hostPort.isEmpty()) {

Review Comment:
   Since Java 11 we can use `isBlank()` that also ignores white spaces. 
Unfortunately, we still need to provide compatibility with Java 8, where we can 
achieve the same by doing `hostPort.trim().isEmpty()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #14092: KAFKA-15239: Fix system tests using producer performance service

2023-08-05 Thread via GitHub


fvaleri commented on PR #14092:
URL: https://github.com/apache/kafka/pull/14092#issuecomment-1666442787

   > That does appear to work, because [server-common is not being excluded 
from 
tools-dependant-libs](https://github.com/apache/kafka/blob/b3db905b27ff4133f4018ac922c9ce2beb2d6087/build.gradle#L1895-L1897)
 like the clients is.
   
   Exactly.
   
   > If we choose not to change this compatibility boundary from between 
server-common + clients to between tools + server-common, then I think the fix 
you propose is possible. 
   
   I think we can improve further as you say, but probably it's better to do it 
in a separate PR, starting from working code.
   
   > Can you revert the changes in producer_performance.py? they don't seem to 
have an effect anymore.
   
   Done.
   
   Thanks again for the help, appreciated.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-05 Thread via GitHub


showuon commented on PR #14116:
URL: https://github.com/apache/kafka/pull/14116#issuecomment-1666411942

   Please let me know when ready for review. Thanks for the work!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix

2023-08-05 Thread via GitHub


kamalcph commented on PR #14151:
URL: https://github.com/apache/kafka/pull/14151#issuecomment-1666405256

   Not clear on this patch, will go another round of review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org