Re: Fwd: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-04-10 Thread Matthias J. Sax

Thanks.

One question: for the repartition topic format change, do we want to 
re-use flag=2, or should we introduce flag=3, and determine when 
compiling the DSL into the Topology if we want/need to include the 
timestamp, and if not, use format version=2 to avoid unnecessary overhead?



-Matthias

On 4/10/23 5:47 PM, Victoria Xia wrote:

Hi everyone,

While wrapping up the implementation for KIP-914, I have discovered that
two more DSL processors require semantic updates in the presence of
versioned tables:

- The table filter processor has an optimization to drop nulls if the
previous filtered value is also null. When the upstream table is versioned,
this optimization should be disabled in order to preserve proper version
history in the presence of out-of-order data.
- When performing an aggregation over a versioned table, only the latest
value by timestamp (per key) should be included in the final aggregate
value. This is not happening today in the presence of out-of-order data,
due to the way that TableSourceNodes call `get(key)` in order to determine
the "old value" which is to be removed from the aggregate as part of
applying an update. To fix this, aggregations should ignore out-of-order
records when aggregating versioned tables.
   - In order to implement this change, table aggregate processors need
   a way to determine whether a record is out-of-order or not. This
cannot be
   done by querying the source table value getter as that store belongs to a
   different subtopology (because a repartition occurs before
aggregation). As
   such, an additional timestamp must be included in the repartition topic.
   The 3.5 release already includes an update to the repartition
topic format
   (with upgrade implications properly handled) via KIP-904
   
,
   so making an additional change to the repartition topic format to add a
   timestamp comes at no additional cost to users.


I have updated the KIP

itself with more detail about each of these changes. Please let me know if
there are any concerns. In the absence of dissent, I'd like to include
these changes along with the rest of KIP-914 in the 3.5 release.

Apologies for not noticing these additional semantics implications earlier,
Victoria

-- Forwarded message -
From: Victoria Xia 
Date: Wed, Mar 22, 2023 at 10:08 AM
Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores
To: 


Thanks for voting, everyone! We have three binding yes votes with no
objections during four full days of voting. I will close the vote and mark
the KIP as accepted, right in time for the 3.5 release.

Thanks,
Victoria

On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna  wrote:


+1 (binding)

Thanks Victoria!

Best,
Bruno

On 20.03.23 17:13, Matthias J. Sax wrote:

+1 (binding)

On 3/20/23 9:05 AM, Guozhang Wang wrote:

+1, thank you Victoria!

On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia
 wrote:


Hi all,

I'd like to start a vote on KIP-914 for updating the Kafka Streams join
processors to use proper timestamp-based semantics in applications with
versioned stores:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores


To avoid compatibility concerns, I'd like to include the changes from
this
KIP together with KIP-889
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores



(for introducing versioned stores) in the upcoming 3.5 release. I will
close the vote on the 3.5 KIP deadline, March 22, if there are no
objections before then.

Thanks,
Victoria






[jira] [Created] (KAFKA-14889) RemoteLogManager - allow consumer fetch records from remote storage implementation

2023-04-10 Thread Luke Chen (Jira)
Luke Chen created KAFKA-14889:
-

 Summary: RemoteLogManager - allow consumer fetch records from 
remote storage implementation 
 Key: KAFKA-14889
 URL: https://issues.apache.org/jira/browse/KAFKA-14889
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen


Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA 
covers enabling consumers fetch records from remote storage

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign]
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14888) RemoteLogManager - deleting expired/size breached log segments to remote storage implementation

2023-04-10 Thread Luke Chen (Jira)
Luke Chen created KAFKA-14888:
-

 Summary: RemoteLogManager - deleting expired/size breached log 
segments to remote storage implementation 
 Key: KAFKA-14888
 URL: https://issues.apache.org/jira/browse/KAFKA-14888
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen


Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA 
covers deleting time/size breached log segments in remote storage.

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Fwd: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-04-10 Thread Victoria Xia
Hi everyone,

While wrapping up the implementation for KIP-914, I have discovered that
two more DSL processors require semantic updates in the presence of
versioned tables:

   - The table filter processor has an optimization to drop nulls if the
   previous filtered value is also null. When the upstream table is versioned,
   this optimization should be disabled in order to preserve proper version
   history in the presence of out-of-order data.
   - When performing an aggregation over a versioned table, only the latest
   value by timestamp (per key) should be included in the final aggregate
   value. This is not happening today in the presence of out-of-order data,
   due to the way that TableSourceNodes call `get(key)` in order to determine
   the "old value" which is to be removed from the aggregate as part of
   applying an update. To fix this, aggregations should ignore out-of-order
   records when aggregating versioned tables.
  - In order to implement this change, table aggregate processors need
  a way to determine whether a record is out-of-order or not. This
cannot be
  done by querying the source table value getter as that store belongs to a
  different subtopology (because a repartition occurs before
aggregation). As
  such, an additional timestamp must be included in the repartition topic.
  The 3.5 release already includes an update to the repartition
topic format
  (with upgrade implications properly handled) via KIP-904
  
,
  so making an additional change to the repartition topic format to add a
  timestamp comes at no additional cost to users.


I have updated the KIP

itself with more detail about each of these changes. Please let me know if
there are any concerns. In the absence of dissent, I'd like to include
these changes along with the rest of KIP-914 in the 3.5 release.

Apologies for not noticing these additional semantics implications earlier,
Victoria

-- Forwarded message -
From: Victoria Xia 
Date: Wed, Mar 22, 2023 at 10:08 AM
Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores
To: 


Thanks for voting, everyone! We have three binding yes votes with no
objections during four full days of voting. I will close the vote and mark
the KIP as accepted, right in time for the 3.5 release.

Thanks,
Victoria

On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna  wrote:

> +1 (binding)
>
> Thanks Victoria!
>
> Best,
> Bruno
>
> On 20.03.23 17:13, Matthias J. Sax wrote:
> > +1 (binding)
> >
> > On 3/20/23 9:05 AM, Guozhang Wang wrote:
> >> +1, thank you Victoria!
> >>
> >> On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia
> >>  wrote:
> >>>
> >>> Hi all,
> >>>
> >>> I'd like to start a vote on KIP-914 for updating the Kafka Streams join
> >>> processors to use proper timestamp-based semantics in applications with
> >>> versioned stores:
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
> >>>
> >>> To avoid compatibility concerns, I'd like to include the changes from
> >>> this
> >>> KIP together with KIP-889
> >>> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >
> >>> (for introducing versioned stores) in the upcoming 3.5 release. I will
> >>> close the vote on the 3.5 KIP deadline, March 22, if there are no
> >>> objections before then.
> >>>
> >>> Thanks,
> >>> Victoria
>


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1747

2023-04-10 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 461700 lines...]
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 184 > AuthorizerTest > testDeleteAllAclOnPrefixedResource(String) > 
kafka.security.authorizer.AuthorizerTest.testDeleteAllAclOnPrefixedResource(String)[1]
 PASSED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 184 > AuthorizerTest > testDeleteAllAclOnPrefixedResource(String) > 
kafka.security.authorizer.AuthorizerTest.testDeleteAllAclOnPrefixedResource(String)[2]
 STARTED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 184 > AuthorizerTest > testDeleteAllAclOnPrefixedResource(String) > 
kafka.security.authorizer.AuthorizerTest.testDeleteAllAclOnPrefixedResource(String)[2]
 PASSED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > AddPartitionsTest > testMissingPartitionsInCreateTopics(String) 
> kafka.admin.AddPartitionsTest.testMissingPartitionsInCreateTopics(String)[1] 
PASSED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > AddPartitionsTest > testMissingPartitionsInCreateTopics(String) 
> kafka.admin.AddPartitionsTest.testMissingPartitionsInCreateTopics(String)[2] 
STARTED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > AddPartitionsTest > testMissingPartitionsInCreateTopics(String) 
> kafka.admin.AddPartitionsTest.testMissingPartitionsInCreateTopics(String)[2] 
PASSED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > ReassignPartitionsIntegrationTest > 
testAlterReassignmentThrottle(String) > 
kafka.admin.ReassignPartitionsIntegrationTest.testAlterReassignmentThrottle(String)[1]
 STARTED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > ReassignPartitionsIntegrationTest > 
testAlterReassignmentThrottle(String) > 
kafka.admin.ReassignPartitionsIntegrationTest.testAlterReassignmentThrottle(String)[1]
 PASSED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > ReassignPartitionsIntegrationTest > 
testAlterReassignmentThrottle(String) > 
kafka.admin.ReassignPartitionsIntegrationTest.testAlterReassignmentThrottle(String)[2]
 STARTED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > ReassignPartitionsIntegrationTest > 
testAlterReassignmentThrottle(String) > 
kafka.admin.ReassignPartitionsIntegrationTest.testAlterReassignmentThrottle(String)[2]
 PASSED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > ReassignPartitionsIntegrationTest > testCancellation(String) > 
kafka.admin.ReassignPartitionsIntegrationTest.testCancellation(String)[1] 
STARTED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > ReassignPartitionsIntegrationTest > testCancellation(String) > 
kafka.admin.ReassignPartitionsIntegrationTest.testCancellation(String)[1] PASSED
[2023-04-10T23:19:43.148Z] 
[2023-04-10T23:19:43.148Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > ReassignPartitionsIntegrationTest > testCancellation(String) > 
kafka.admin.ReassignPartitionsIntegrationTest.testCancellation(String)[2] 
STARTED
[2023-04-10T23:19:49.820Z] 
[2023-04-10T23:19:49.820Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 183 > ReassignPartitionsIntegrationTest > testCancellation(String) > 
kafka.admin.ReassignPartitionsIntegrationTest.testCancellation(String)[2] PASSED
[2023-04-10T23:19:49.820Z] 
[2023-04-10T23:19:49.820Z] 2027 tests completed, 6 failed, 4 skipped
[2023-04-10T23:19:50.740Z] There were failing tests. See the report at: 
file:///home/jenkins/workspace/Kafka_kafka_trunk/core/build/reports/tests/integrationTest/index.html
[2023-04-10T23:20:22.322Z] 
[2023-04-10T23:20:22.322Z] > Task :streams:integrationTest
[2023-04-10T23:20:22.322Z] 
[2023-04-10T23:20:22.322Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 182 > FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() PASSED
[2023-04-10T23:20:22.322Z] 
[2023-04-10T23:20:22.322Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 182 > FineGrainedAutoResetIntegrationTest > 
shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest()
 STARTED
[2023-04-10T23:21:07.035Z] 
[2023-04

[VOTE] KIP-909: DNS Resolution Fallures Should Not Fail the Client

2023-04-10 Thread Philip Nee
Hey everyone!

I'm starting a vote for KIP-909: DNS Resolution Fallures Should Not Fail
the Client 

Please refer to the discussion thread here:
https://lists.apache.org/thread/st84zzwnq5m3pkzd1r7jk9lmqdt9m98s

Thanks!
P


Re: [DISCUSS] KIP-909: Allow clients to rebootstrap DNS lookup failure

2023-04-10 Thread Philip Nee
Thanks, everyone: I'm starting a vote today.  Here's the recap for some of
the questions:

John: I changed the proposal to throw a non-retriable exception after the
timeout elapses. I feel it might be necessary to poison the client after
retry expires, as it might indicate a real issue.
Ismael: The proposal is to add a configuration for the retry and it will
throw a non-retriable exception after the time expires.
Chris: Addressed some unclarity that you mentioned, and a new API won't be
introduced in this KIP.  Maybe up for future discussion.
Jason: I'm proposing adding a timeout config and a bootstrap exception per
your suggestion.
Kirk: I'm proposing throwing a non-retriable exception in the network
client. See previous comment.


On Mon, Feb 27, 2023 at 9:36 AM Chris Egerton 
wrote:

> Hi Philip,
>
> Yeah,  "DNS resolution should occur..." seems like a better fit. 👍
>
> One other question I have is whether we should expose some kind of public
> API for performing preflight validation of the bootstrap URLs. If we change
> the behavior of a client configured with a silly typo (e.g.,
> "loclahost instead of localhost") from failing in the constructor to
> failing with a retriable exception, this might lead some client
> applications to handle that failure by, well, retrying. For reference, this
> is exactly what we do in Kafka Connect right now; see [1] and [2]. IMO it'd
> be nice to be able to opt into keeping the current behavior so that
> projects like Connect could still do preflight checks of the
> bootstrap.servers property for connectors before starting them, and report
> any issues by failing fast instead of continuously writing warning/error
> messages to their logs.
>
> I'm not sure about where this new API could go, but a few options might be:
>
> - Expose a public variant of the existing ClientUtils class
> - Add static methods to the ConsumerConfig, ProducerConfig, and
> AdminClientConfig classes
> - Add those same static methods to the KafkaConsumer, KafkaProducer, and
> KafkaAdminClient classes
>
> If this seems reasonable, we should probably also specify in the KIP that
> Kafka Connect will leverage this preflight validation logic before
> instantiating any Kafka clients for use by connectors or tasks, and
> continue to fail fast if there are typos in the bootstrap.servers property,
> or if temporary DNS resolution issues come up.
>
> [1] -
>
> https://github.com/apache/kafka/blob/5f9d01668cae64b2cacd7872d82964fa78862aaf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L606
> [2] -
>
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L439
>
> Cheers,
>
> Chris
>
> On Fri, Feb 24, 2023 at 4:59 PM Philip Nee  wrote:
>
> > Hey Chris,
> >
> > Thanks for the quick response, and I apologize for the unclear wording
> > there, I guess "DNS lookup" would be a more appropriate wording here. So
> > what I meant there was, to delegate the DNS lookup in the constructor to
> > the network client poll, and it will happen on the very first poll.  I
> > guess the logic could look like this:
> >
> > - if the client has been bootstrapped, do nothing.
> > - Otherwise, perform DNS lookup, and acquire the bootstrap server
> address.
> >
> > Thanks for the comment there, I'll change up the wording.  Maybe revise
> it
> > as "DNS resolution should occur in the poll" ?
> >
> > P
> >
> > On Fri, Feb 24, 2023 at 1:47 PM Chris Egerton 
> > wrote:
> >
> > > Hi Philip,
> > >
> > > Thanks for the KIP!
> > >
> > > QQ: In the "Proposed Changes" section, the KIP states that
> "Bootstrapping
> > > should now occur in the poll method before attempting to update the
> > > metadata. This includes resolving the addresses and bootstrapping the
> > > metadata.". By "bootstrapping the metadata" do we mean actually
> > contacting
> > > the bootstrap servers, or just setting some internal state related to
> the
> > > current set of servers that can be contacted for metadata? I ask
> because
> > it
> > > seems like the language here implies the former, but if that's the
> case,
> > > this is already happening in poll (or at least, the first invocation of
> > > it), and if it's the latter, it's probably not necessary to mention in
> > the
> > > KIP since it doesn't really impact user-facing behavior. It also seems
> > like
> > > that detail might impact how intertwined this and KIP-899 are, though
> the
> > > similarity could still be superficial either way.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Thu, Feb 23, 2023 at 9:21 PM Philip Nee 
> wrote:
> > >
> > > > Hey Ismael,
> > > >
> > > > Thanks for the feedback! The proposal is not to retry automatically
> but
> > > > relies on the user polling the NetworkClient (basically,
> consumer.poll)
> > > to
> > > > reattempt the bootstrap. If bootstrapping fails, a NetworkException
> > > > (retriable) will be thrown.
> > > >
> > > > Thanks!
> > > > P
> > > >

[jira] [Created] (KAFKA-14887) ZK session timeout can cause broker to shutdown

2023-04-10 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-14887:
-

 Summary: ZK session timeout can cause broker to shutdown
 Key: KAFKA-14887
 URL: https://issues.apache.org/jira/browse/KAFKA-14887
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 3.3.2, 3.3.1, 3.2.3, 3.2.2, 3.4.0, 3.2.1, 3.1.2, 3.0.2, 
3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 2.7.2, 3.1.0, 2.7.1, 2.8.0, 
2.7.0
Reporter: Ron Dagostino


We have the following code in FinalizedFeatureChangeListener.scala which will 
exit regardless of the type of exception that is thrown when trying to process 
feature changes:

case e: Exception => {
  error("Failed to process feature ZK node change event. The broker 
will eventually exit.", e)
  throw new FatalExitError(1)

The issue here is that this does not distinguish between exceptions caused by 
an inability to process a feature change and an exception caused by a ZooKeeper 
session timeout.  We want to shut the broker down for the former case, but we 
do NOT want to shut the broker down in the latter case; the ZooKeeper session 
will eventually be reestablished, and we can continue processing at that time.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1746

2023-04-10 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 548332 lines...]
[2023-04-10T19:58:29.468Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 132 > MirrorConnectorsIntegrationTransactionsTest > 
testReplicateSourceDefault() PASSED
[2023-04-10T19:58:29.468Z] 
[2023-04-10T19:58:29.468Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 132 > MirrorConnectorsIntegrationTransactionsTest > 
testOffsetSyncsTopicsOnTarget() STARTED
[2023-04-10T19:58:37.902Z] 
[2023-04-10T19:58:37.902Z] > Task :connect:mirror:integrationTest
[2023-04-10T19:58:37.902Z] 
[2023-04-10T19:58:37.902Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 129 > MirrorConnectorsIntegrationExactlyOnceTest > 
testRestartReplication() PASSED
[2023-04-10T19:58:37.902Z] 
[2023-04-10T19:58:37.902Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 129 > MirrorConnectorsIntegrationExactlyOnceTest > 
testOneWayReplicationWithFrequentOffsetSyncs() STARTED
[2023-04-10T19:58:41.756Z] 
[2023-04-10T19:58:41.757Z] > Task :connect:runtime:integrationTest
[2023-04-10T19:58:41.757Z] 
[2023-04-10T19:58:41.757Z] Gradle Test Run :connect:runtime:integrationTest > 
Gradle Test Executor 147 > 
org.apache.kafka.connect.integration.TransformationIntegrationTest > 
testFilterOnTombstonesWithSinkConnector PASSED
[2023-04-10T19:58:41.757Z] 
[2023-04-10T19:58:41.757Z] Gradle Test Run :connect:runtime:integrationTest > 
Gradle Test Executor 147 > 
org.apache.kafka.connect.integration.TransformationIntegrationTest > 
testFilterOnTopicNameWithSinkConnector STARTED
[2023-04-10T19:59:18.073Z] 
[2023-04-10T19:59:18.073Z] Gradle Test Run :connect:runtime:integrationTest > 
Gradle Test Executor 147 > 
org.apache.kafka.connect.integration.TransformationIntegrationTest > 
testFilterOnTopicNameWithSinkConnector PASSED
[2023-04-10T19:59:29.361Z] 
[2023-04-10T19:59:29.361Z] Gradle Test Run :connect:runtime:integrationTest > 
Gradle Test Executor 187 > 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest > 
testGetSourceConnectorOffsetsCustomOffsetsTopic STARTED
[2023-04-10T19:59:51.249Z] 
[2023-04-10T19:59:51.249Z] > Task :connect:mirror:integrationTest
[2023-04-10T19:59:51.249Z] 
[2023-04-10T19:59:51.249Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 132 > MirrorConnectorsIntegrationTransactionsTest > 
testOffsetSyncsTopicsOnTarget() PASSED
[2023-04-10T19:59:51.249Z] 
[2023-04-10T19:59:51.249Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 132 > MirrorConnectorsIntegrationTransactionsTest > 
testReplicateTargetDefault() STARTED
[2023-04-10T20:00:39.248Z] 
[2023-04-10T20:00:39.248Z] > Task :connect:runtime:integrationTest
[2023-04-10T20:00:39.248Z] 
[2023-04-10T20:00:39.248Z] Gradle Test Run :connect:runtime:integrationTest > 
Gradle Test Executor 187 > 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest > 
testGetSourceConnectorOffsetsCustomOffsetsTopic PASSED
[2023-04-10T20:00:44.163Z] 
[2023-04-10T20:00:44.163Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 129 > MirrorConnectorsIntegrationExactlyOnceTest > 
testOneWayReplicationWithFrequentOffsetSyncs() PASSED
[2023-04-10T20:00:44.163Z] 
[2023-04-10T20:00:44.163Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 129 > MirrorConnectorsIntegrationTransactionsTest > 
testReplication() STARTED
[2023-04-10T20:00:55.998Z] 
[2023-04-10T20:00:55.998Z] 89 tests completed, 1 failed, 1 skipped
[2023-04-10T20:01:00.774Z] There were failing tests. See the report at: 
file:///home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka_trunk@2/connect/runtime/build/reports/tests/integrationTest/index.html
[2023-04-10T20:01:31.458Z] 
[2023-04-10T20:01:31.458Z] > Task :connect:mirror:integrationTest
[2023-04-10T20:01:31.458Z] 
[2023-04-10T20:01:31.458Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 132 > MirrorConnectorsIntegrationTransactionsTest > 
testReplicateTargetDefault() PASSED
[2023-04-10T20:01:31.458Z] 
[2023-04-10T20:01:31.458Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 132 > MirrorConnectorsIntegrationTransactionsTest > 
testNoCheckpointsIfNoRecordsAreMirrored() STARTED
[2023-04-10T20:02:51.061Z] 
[2023-04-10T20:02:51.061Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 129 > MirrorConnectorsIntegrationTransactionsTest > 
testReplication() PASSED
[2023-04-10T20:02:51.061Z] 
[2023-04-10T20:02:51.061Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 129 > MirrorConnectorsIntegrationTransactionsTest > 
testReplicationWithEmptyPartition() STARTED
[2023-04-10T20:03:04.759Z] 
[2023-04-10T20:03:04.759Z] Gradle Test Run :connect:mirror:integrationTest > 
Gradle Test Executor 132 > MirrorConnectorsIntegrationTransactionsTest 

[jira] [Created] (KAFKA-14886) Broker request handler thread pool is full due to single request slowdown

2023-04-10 Thread Haoze Wu (Jira)
Haoze Wu created KAFKA-14886:


 Summary: Broker request handler thread pool is full due to single 
request slowdown
 Key: KAFKA-14886
 URL: https://issues.apache.org/jira/browse/KAFKA-14886
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.8.0
Reporter: Haoze Wu


In Kafka-2.8.0, we found that the number of data plane Kafka request handlers 
may quickly reach the limit when only one request is stuck. As a result, all 
other requests that require a data plane request handler will be stuck.

When there is a slowdown inside the storeOffsets function at line 777 due to 
I/O operation, the thread holds the lock acquired at line 754.

 
{code:java}
  private def doCommitOffsets(group: GroupMetadata,
                              memberId: String,
                              groupInstanceId: Option[String],
                              generationId: Int,
                              offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                              responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit): Unit = {
    group.inLock { // Line 754
..
      groupManager.storeOffsets() // Line 777
..
  }
} {code}
Its call stack is:

 
{code:java}
kafka.coordinator.group.GroupMetadata,inLock,227
kafka.coordinator.group.GroupCoordinator,handleCommitOffsets,755
kafka.server.KafkaApis,handleOffsetCommitRequest,515
kafka.server.KafkaApis,handle,175
kafka.server.KafkaRequestHandler,run,74
java.lang.Thread,run,748 {code}
This happens when the broker is handling the commit offset request from the 
consumer. When the slowdown mentioned above makes consumers get no response 
back, the consumer will automatically resend the request to the broker. Note 
that each request from the consumer is handled by a 
data-plane-kafka-request-handler thread. Therefore, another 
data-plane-kafka-request-handler thread will be also stuck at line 754 when 
handling the retry requests, because it tries to acquire the very same lock of 
the consumer group. The retry will occur repeatedly, and none of them can 
succeed. As a result, the pool of data-plane-kafka-request-handler threads will 
be full. Note that this pool of threads is responsible for handling all such 
requests from all producers and consumers. As a result, all the producers and 
consumers would be affected.

However, the backoff mechanism might be able to solve this issue, by reducing 
the number of requests in a short time and reserving more slots in the thread 
pool. Therefore, we increase the backoff config “retry-backoff-ms”, to see if 
the issue disappears. Specifically, we increase the retry backoff from 100ms 
(default) to 1000ms in consumer’s config. However, we found that the mentioned 
thread pool is full again, because there are multiple heartbeat requests that 
take up the slots of this thread pool. All those heartbeat request handling is 
stuck when they are acquiring the same consumer group lock, which has been 
acquired at line 754 as mentioned. Specifically, the heartbeat handling is 
stuck at GroupCoordinator.handleHeartbeat@624:
{code:java}
  def handleHeartbeat(groupId: String,
                      memberId: String,
                      groupInstanceId: Option[String],
                      generationId: Int,
                      responseCallback: Errors => Unit): Unit = {
..
      case Some(group) => group.inLock { // Line 624
..
      }
..
} {code}
The heartbeat requests are sent at the interval of 3000ms (by default) from the 
consumer. It has no backoff mechanism. The thread pool for 
data-plane-kafka-request-handler will be full soon.

Fix: 

Instead of waiting for the lock, we can just try to acquire the lock (probably 
with a time limit). If the acquisition fails, this request can be discarded so 
that other requests (which include the retry of the discarded one) can be 
processed. However, we feel this fix would affect the semantic of many 
operations. We would like to hear some suggestions from the community.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14885) Client can connect to broker and broker can not connect zookeeper

2023-04-10 Thread zou shengfu (Jira)
zou shengfu created KAFKA-14885:
---

 Summary: Client can connect to broker and broker can not connect 
zookeeper
 Key: KAFKA-14885
 URL: https://issues.apache.org/jira/browse/KAFKA-14885
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.3.2
Reporter: zou shengfu
Assignee: zou shengfu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)