[jira] [Created] (KAFKA-13710) Fix failed testProduceWithInvalidTimestamp/testSendWithInvalidCreateTime tests

2022-03-05 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13710:
-

 Summary: Fix failed 
testProduceWithInvalidTimestamp/testSendWithInvalidCreateTime tests
 Key: KAFKA-13710
 URL: https://issues.apache.org/jira/browse/KAFKA-13710
 Project: Kafka
  Issue Type: Test
Reporter: Luke Chen
Assignee: Luke Chen


org.opentest4j.AssertionFailedError: expected:  but was: 
 at 
app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) 
at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) 
at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) 
at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141) at 
app//kafka.server.ProduceRequestTest.testProduceWithInvalidTimestamp(ProduceRequestTest.scala:119)

 

[https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/741/testReport/junit/kafka.server/ProduceRequestTest/Build___JDK_11_and_Scala_2_13___testProduceWithInvalidTimestamp__/]

 

 

org.opentest4j.AssertionFailedError: expected:  but was:  at 
app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179) at 
app//kafka.api.PlaintextProducerSendTest.testSendWithInvalidCreateTime(PlaintextProducerSendTest.scala:102)

 

https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/741/testReport/junit/kafka.api/PlaintextProducerSendTest/Build___JDK_11_and_Scala_2_13___testSendWithInvalidCreateTime__/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-05 Thread David Jacot
Hi Sergio,

I wonder if « max-bytes » would be a better name than « max-batches-size ».
The intend is more explicit. What do you think?

Best,
David

Le sam. 5 mars 2022 à 10:36, Luke Chen  a écrit :

> Hi Sergio,
>
> Thanks for the explanation! Very clear!
> I think we should put this example and explanation into KIP.
>
> Other comments:
> 1. If the *max-batches-size* is too small that results in no records
> output, will we output any information to the user?
> 2. After your explanation, I guess the use of *max-batches-size* won't
> conflict with *max-message-size*, right?
> That is, user can set the 2 arguments at the same time. Is that correct?
>
> Thank you.
> Luke
>
> Thank you.
> Luke
>
> On Sat, Mar 5, 2022 at 4:47 PM Sergio Daniel Troiano
>  wrote:
>
> > hey Luke,
> >
> > thanks for the interest, it is a good question, please let me explain
> you:
> >
> > *max-message-size *a filter for the size of each batch, so for example if
> > Iset --max-message-size 1000 bytes and my segment log has 300 batches,
> 150
> > of them has a size of 500 bytes  and the other 150 has a size of 2000
> bytes
> > then the script will skip the las 150 ones as each batch is heavier than
> > the limit.
> >
> > In the other hand following the same example above with *max-batches-size
> > *set
> > to 1000 bytes it will only print out the first 2 batches (500 bytes each)
> > and stop, This will avoid reading the whole file
> >
> >
> > Also if all of them are smaller than 1000 bytes it will end up printing
> out
> > all the batches.
> > The idea of my change is to limit the *amount* of batches no matter their
> > size.
> >
> > I hope this reply helps.
> > Best regards.
> >
> > On Sat, 5 Mar 2022 at 08:00, Luke Chen  wrote:
> >
> > > Hi Sergio,
> > >
> > > Thanks for the KIP!
> > >
> > > One question:
> > > I saw there's a `max-message-size` argument that seems to do the same
> > thing
> > > as you want.
> > > Could you help explain what's the difference between `max-message-size`
> > and
> > > `max-batches-size`?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Sat, Mar 5, 2022 at 3:21 AM Kirk True 
> wrote:
> > >
> > > > Hi Sergio,
> > > >
> > > > Thanks for the KIP. I don't know anything about the log segment
> > > internals,
> > > > but the logic and implementation seem sound.
> > > >
> > > > Three questions:
> > > >  1. Since the --max-batches-size unit is bytes, does it matter if
> that
> > > > size doesn't align to a record boundary?
> > > >  2. Can you add a check to make sure that --max-batches-size doesn't
> > > allow
> > > > the user to pass in a negative number?
> > > >  3. Can you add/update any unit tests related to the DumpLogSegments
> > > > arguments?
> > > > Thanks,
> > > > Kirk
> > > >
> > > > On Thu, Mar 3, 2022, at 1:32 PM, Sergio Daniel Troiano wrote:
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-824%3A+Allowing+dumping+segmentlogs+limiting+the+batches+in+the+output
> > > > >
> > > >
> > >
> >
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #741

2022-03-05 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #740

2022-03-05 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-05 Thread Luke Chen
Hi Sergio,

Thanks for the explanation! Very clear!
I think we should put this example and explanation into KIP.

Other comments:
1. If the *max-batches-size* is too small that results in no records
output, will we output any information to the user?
2. After your explanation, I guess the use of *max-batches-size* won't
conflict with *max-message-size*, right?
That is, user can set the 2 arguments at the same time. Is that correct?

Thank you.
Luke

Thank you.
Luke

On Sat, Mar 5, 2022 at 4:47 PM Sergio Daniel Troiano
 wrote:

> hey Luke,
>
> thanks for the interest, it is a good question, please let me explain you:
>
> *max-message-size *a filter for the size of each batch, so for example if
> Iset --max-message-size 1000 bytes and my segment log has 300 batches, 150
> of them has a size of 500 bytes  and the other 150 has a size of 2000 bytes
> then the script will skip the las 150 ones as each batch is heavier than
> the limit.
>
> In the other hand following the same example above with *max-batches-size
> *set
> to 1000 bytes it will only print out the first 2 batches (500 bytes each)
> and stop, This will avoid reading the whole file
>
>
> Also if all of them are smaller than 1000 bytes it will end up printing out
> all the batches.
> The idea of my change is to limit the *amount* of batches no matter their
> size.
>
> I hope this reply helps.
> Best regards.
>
> On Sat, 5 Mar 2022 at 08:00, Luke Chen  wrote:
>
> > Hi Sergio,
> >
> > Thanks for the KIP!
> >
> > One question:
> > I saw there's a `max-message-size` argument that seems to do the same
> thing
> > as you want.
> > Could you help explain what's the difference between `max-message-size`
> and
> > `max-batches-size`?
> >
> > Thank you.
> > Luke
> >
> > On Sat, Mar 5, 2022 at 3:21 AM Kirk True  wrote:
> >
> > > Hi Sergio,
> > >
> > > Thanks for the KIP. I don't know anything about the log segment
> > internals,
> > > but the logic and implementation seem sound.
> > >
> > > Three questions:
> > >  1. Since the --max-batches-size unit is bytes, does it matter if that
> > > size doesn't align to a record boundary?
> > >  2. Can you add a check to make sure that --max-batches-size doesn't
> > allow
> > > the user to pass in a negative number?
> > >  3. Can you add/update any unit tests related to the DumpLogSegments
> > > arguments?
> > > Thanks,
> > > Kirk
> > >
> > > On Thu, Mar 3, 2022, at 1:32 PM, Sergio Daniel Troiano wrote:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-824%3A+Allowing+dumping+segmentlogs+limiting+the+batches+in+the+output
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-05 Thread Sergio Daniel Troiano
hey Luke,

thanks for the interest, it is a good question, please let me explain you:

*max-message-size *a filter for the size of each batch, so for example if
Iset --max-message-size 1000 bytes and my segment log has 300 batches, 150
of them has a size of 500 bytes  and the other 150 has a size of 2000 bytes
then the script will skip the las 150 ones as each batch is heavier than
the limit.

In the other hand following the same example above with *max-batches-size  *set
to 1000 bytes it will only print out the first 2 batches (500 bytes each)
and stop, This will avoid reading the whole file


Also if all of them are smaller than 1000 bytes it will end up printing out
all the batches.
The idea of my change is to limit the *amount* of batches no matter their
size.

I hope this reply helps.
Best regards.

On Sat, 5 Mar 2022 at 08:00, Luke Chen  wrote:

> Hi Sergio,
>
> Thanks for the KIP!
>
> One question:
> I saw there's a `max-message-size` argument that seems to do the same thing
> as you want.
> Could you help explain what's the difference between `max-message-size` and
> `max-batches-size`?
>
> Thank you.
> Luke
>
> On Sat, Mar 5, 2022 at 3:21 AM Kirk True  wrote:
>
> > Hi Sergio,
> >
> > Thanks for the KIP. I don't know anything about the log segment
> internals,
> > but the logic and implementation seem sound.
> >
> > Three questions:
> >  1. Since the --max-batches-size unit is bytes, does it matter if that
> > size doesn't align to a record boundary?
> >  2. Can you add a check to make sure that --max-batches-size doesn't
> allow
> > the user to pass in a negative number?
> >  3. Can you add/update any unit tests related to the DumpLogSegments
> > arguments?
> > Thanks,
> > Kirk
> >
> > On Thu, Mar 3, 2022, at 1:32 PM, Sergio Daniel Troiano wrote:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-824%3A+Allowing+dumping+segmentlogs+limiting+the+batches+in+the+output
> > >
> >
>


[jira] [Resolved] (KAFKA-13694) Some InvalidRecordException messages are thrown away

2022-03-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13694.
---
Fix Version/s: 3.2.0
 Assignee: RivenSun
   Resolution: Fixed

> Some InvalidRecordException messages are thrown away
> 
>
> Key: KAFKA-13694
> URL: https://issues.apache.org/jira/browse/KAFKA-13694
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> 1.Example
> Topic level config:"cleanup.policy":"compact" 
> But when the producer sends the message, the ProducerRecord does not specify 
> the key.
>  
> producer.log
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One 
> or more records have been rejected {code}
>  
>  
> server.log
> {code:java}
> [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: One or more records have been 
> rejected {code}
> Through the logs of the producer and server, we do not know the reason for 
> the failure of sending, only that the message was rejected by the server.
> You can compare the RecordTooLargeException testCase, we can clearly know the 
> reason for the failure from the producer, and the server will not print the 
> log (the reason will be explained later)
> producer_message_too_large.log :
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept.
> [kafka-producer-network-thread | producer-1] ERROR 
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
> request included a message larger than the max message size the server will 
> accept. {code}
> 2.RootCause
> ReplicaManager#appendToLocalLog(...) ->
> Partition#appendRecordsToLeader(...) ->
> UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) ->
> LogValidator#validateMessagesAndAssignOffsets(...) 
> 1) Analyze the validateMessagesAndAssignOffsets method,
> In the LogValidator#validateRecord method, validateKey and validateTimestamp 
> are called, and the error information of all messages is obtained: 
> Seq[ApiRecordError];
> In the subsequent processRecordErrors(recordErrors) method, currently only 
> special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR 
> returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the 
> code will run to
> {code:java}
> else {
>   throw new RecordValidationException(new InvalidRecordException(
> "One or more records have been rejected"), errors)
> }{code}
> In fact, the *errors* variable here contains the specific information of each 
> recordError, but we did not put the errors information into the message of 
> InvalidRecordException.
> 2).The exception thrown by processRecordErrors will be caught by 
> ReplicaManager#appendToLocalLog(...), we continue to analyze the 
> `catchException code` of appendToLocalLog.
> Here, we can know the RecordTooLargeException, why the server does not print 
> the log.
> Under case rve: RecordValidationException,
> The server prints the log: processFailedRecord method, 
> and sends a response to the client: LogAppendResult method
> In these two methods, we can find that we only use rve.invalidException,
> For rve.recordErrors, the server neither prints it nor returns it to the 
> client.
> 3.Solution
> Two solutions, I prefer the second
> 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns 
> Errors.INVALID_RECORD_WITHOUT_KEY,
> In the processRecordErrors method, also do special processing for 
> Errors.INVALID_RECORD_WITHOUT_KEY
> 2)Modify the logic of the processRecordErrors method, no longer distinguish 
> the types of Errors, and {*}Even if new INVALID_RECORD types will be added in 
> the future{*}, we uniformly return:
> {code:java}
> throw new RecordValidationException(new InvalidRecordException(
>   "One or more records have been rejected due to " + errors.toString()), 
> errors) {code}
> Also need to add toString() method for ProduceResponse.RecordError class
> {code:java}
> @Override
> public String toString() {
> return "RecordError("
> + "batchIndex=" + batchIndex
> + ", message=" + ((message == null) ? "null" : "'" + message + 
> "'")
> + ")";
> } {code}
> In the past, the toString method of ProduceResponse.PartitionResponse has 
> called the toString method of ProduceResponse.RecordError, *but before we 
> 

[jira] [Resolved] (KAFKA-13466) delete unused config batch.size in kafka-console-producer.sh

2022-03-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13466.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> delete unused config batch.size in kafka-console-producer.sh
> 
>
> Key: KAFKA-13466
> URL: https://issues.apache.org/jira/browse/KAFKA-13466
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: peterwanner
>Priority: Minor
> Fix For: 3.2.0
>
> Attachments: image-2021-11-19-04-05-15-754.png, 
> image-2021-11-19-04-09-28-299.png
>
>
>  
> official docs:
> !image-2021-11-19-04-05-15-754.png!
>  
> shell scripts is:
> !image-2021-11-19-04-09-28-299.png!
> in fact, the batch-size config is already not used everywhere in the code 
> anymore, so delete this and may not have a misunderstanding in the future



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12959) Prioritize assigning standby tasks to threads without any active tasks

2022-03-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-12959.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> Prioritize assigning standby tasks to threads without any active tasks
> --
>
> Key: KAFKA-12959
> URL: https://issues.apache.org/jira/browse/KAFKA-12959
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Ravi Bhardwaj
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: app1.log, app2.log
>
>
> Currently while distributing the standby tasks streams does not check if 
> there are threads without any tasks or with less number of tasks. This can 
> lead to few threads getting assigned both active and standby tasks when are 
> threads within the same instance without any tasks assigned.
> Example:
> {code:java}
> App 1:
> [wordcount-lambda-example-client-StreamThread-1] Handle new assignment with:
>  New active tasks: [0_1]
>  New standby tasks: [1_0]
>  Existing active tasks: []
>  Existing standby tasks: [1_0]
> [wordcount-lambda-example-client-StreamThread-2] Handle new assignment with:
>  New active tasks: [1_1]
>  New standby tasks: []
>  Existing active tasks: [1_1]
>  Existing standby tasks: []
> [wordcount-lambda-example-client-StreamThread-3] Handle new assignment with:
>  New active tasks: []
>  New standby tasks: []
>  Existing active tasks: []
>  Existing standby tasks: []
> [wordcount-lambda-example-client-StreamThread-4] Handle new assignment with:
>  New active tasks: []
>  New standby tasks: []
>  Existing active tasks: []
>  Existing standby tasks: []
> {code}
>  
> {code:java}
> App2:
> [wordcount-lambda-example-client-StreamThread-1] Handle new assignment with:
>  New active tasks: [1_0]
>  New standby tasks: [1_1]
>  Existing active tasks: []
>  Existing standby tasks: [1_0, 1_1]
> [wordcount-lambda-example-client-StreamThread-2] Handle new assignment with:
>  New active tasks: [0_0]
>  New standby tasks: []
>  Existing active tasks: []
>  Existing standby tasks: []
> [wordcount-lambda-example-client-StreamThread-3] Handle new assignment with:
>  New active tasks: []
>  New standby tasks: []
>  Existing active tasks: []
>  Existing standby tasks: []
> [wordcount-lambda-example-client-StreamThread-4] Handle new assignment with:
>  New active tasks: []
>  New standby tasks: []
>  Existing active tasks: []
>  Existing standby tasks: []
> {code}
>  The standby tasks in both apps is assigned to Thread-1 even though it has an 
> active tasks when Thread-3 and Thread-4 didn't have any tasks assigned.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13689.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set unused() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(used);
> return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
> if (!values.containsKey(key))
> throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
> used.add(key);
> return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig#logUnused() method
> Modify the log printing information of this method,and the unused 
> configuration log print level can be changed to {*}INFO{*}, what do you think?
> {code:java}
> /**
>  * Log infos for any unused configurations
>  */
> public void logUnused() {     for (String key : unused())
> log.info("The configuration '{}' was supplied but isn't a used 
> config.", key);
> }{code}
>  
>  
> 2. AbstractConfig provides two new methods: logUnknown() and unknown()
> {code:java}
> /**
>  * Log warnings for any unknown configurations
>  */
> public void logUnknown() {
> for (String key : unknown())
> log.warn("The configuration '{}' was supplied but isn't a known 
> config.", key);
> } {code}
>  
> {code:java}
> public Set unknown() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(values.keySet());
> return keys;
> } {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)