Re: [VOTE] KIP-953: partition method to be overloaded to accept headers as well.

2023-08-22 Thread Jack Tomy
Hi Ismael,

That would be totally different from the pattern currently being followed
in all the interfaces, for example serializer.
I personally don't favour that either. Let's see if the community has any
opinions on the same.

Hey everyone, please share your thoughts on using a DTO instead of separate
params for the interface.

Thanks.

On Mon, Aug 21, 2023 at 8:06 PM Ismael Juma  wrote:

> Hi Jack,
>
> I mean a DTO. That means you can add additional parameters later without
> breaking compatibility. The current proposal would result in yet another
> method each time we need to add parameters.
>
> Ismael
>
> On Sun, Aug 20, 2023 at 4:53 AM Jack Tomy  wrote:
>
> > Hey Ismael,
> >
> > Are you suggesting to pass a param like a DTO or you are suggesting to
> pass
> > the record object?
> >
> > I would also like to hear other devs' opinions on this as I personally
> > favour what is done currently.
> >
> > On Thu, Aug 17, 2023 at 9:34 AM Ismael Juma  wrote:
> >
> > > Hi,
> > >
> > > Thanks for the KIP. The problem outlined here is a great example why we
> > > should be using a record-like structure to pass the parameters to a
> > method
> > > like this. Then we can add more parameters without having to introduce
> > new
> > > methods. Have we considered this option?
> > >
> > > Ismael
> > >
> > > On Mon, Aug 7, 2023 at 5:26 AM Jack Tomy 
> wrote:
> > >
> > > > Hey everyone.
> > > >
> > > > I would like to call for a vote on KIP-953: partition method to be
> > > > overloaded to accept headers as well.
> > > >
> > > > KIP :
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > > > Discussion thread :
> > > > https://lists.apache.org/thread/0f20kvfqkmhdqrwcb8vqgqn80szcrcdd
> > > >
> > > > Thanks
> > > > --
> > > > Best Regards
> > > > *Jack*
> > > >
> > >
> >
> >
> > --
> > Best Regards
> > *Jack*
> >
>


-- 
Best Regards
*Jack*


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-22 Thread Roger Hoover
Artem,

Thanks for the reply.

If I understand correctly, Kafka does not support concurrent transactions
from the same producer (transactional id).  I think this means that
applications that want to support in-process concurrency (say thread-level
concurrency with row-level DB locking) would need to manage separate
transactional ids and producers per thread and then store txn state
accordingly.   The potential usability downsides I see are
1) managing a set of transactional ids for each application process that
scales up to it's max concurrency.  Maybe not too bad but a bit of pain to
manage these ids inside each process and across all application processes.
2) creating a separate producer for each concurrency slot in the
application - this could create a lot more producers and resultant
connections to Kafka than the typical model of a single producer per
process.

Otherwise, it seems you're left with single-threaded model per application
process?

Thanks,

Roger

On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
 wrote:

> Hi Roger, Arjun,
>
> Thank you for the questions.
> > It looks like the application must have stable transactional ids over
> time?
>
> The transactional id should uniquely identify a producer instance and needs
> to be stable across the restarts.  If the transactional id is not stable
> across restarts, then zombie messages from a previous incarnation of the
> producer may violate atomicity.  If there are 2 producer instances
> concurrently producing data with the same transactional id, they are going
> to constantly fence each other and most likely make little or no progress.
>
> The name might be a little bit confusing as it may be mistaken for a
> transaction id / TID that uniquely identifies every transaction.  The name
> and the semantics were defined in the original exactly-once-semantics (EoS)
> proposal (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> )
> and KIP-939 just build on top of that.
>
> > I'm curious to understand what happens if the producer dies, and does not
> come up and recover the pending transaction within the transaction timeout
> interval.
>
> If the producer / application never comes back, the transaction will remain
> in prepared (a.k.a. "in-doubt") state until an operator forcefully
> terminates the transaction.  That's why there is a new ACL is defined in
> this proposal -- this functionality should only provided to applications
> that implement proper recovery logic.
>
> -Artem
>
> On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish 
> wrote:
>
> > Hello Artem,
> >
> > Thanks for the KIP.
> >
> > I have the same question as Roger on concurrent writes, and an additional
> > one on consumer behavior. Typically, transactions will timeout if not
> > committed within some time interval. With the proposed changes in this
> KIP,
> > consumers cannot consume past the ongoing transaction. I'm curious to
> > understand what happens if the producer dies, and does not come up and
> > recover the pending transaction within the transaction timeout interval.
> Or
> > are we saying that when used in this 2PC context, we should configure
> these
> > transaction timeouts to very large durations?
> >
> > Thanks in advance!
> >
> > Best,
> > Arjun
> >
> >
> > On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover 
> > wrote:
> >
> > > Hi Artem,
> > >
> > > Thanks for writing this KIP.  Can you clarify the requirements a bit
> more
> > > for managing transaction state?  It looks like the application must
> have
> > > stable transactional ids over time?   What is the granularity of those
> > ids
> > > and producers?  Say the application is a multi-threaded Java web
> server,
> > > can/should all the concurrent threads share a transactional id and
> > > producer?  That doesn't seem right to me unless the application is
> using
> > > global DB locks that serialize all requests.  Instead, if the
> application
> > > uses row-level DB locks, there could be multiple, concurrent,
> independent
> > > txns happening in the same JVM so it seems like the granularity
> managing
> > > transactional ids and txn state needs to line up with granularity of
> the
> > DB
> > > locking.
> > >
> > > Does that make sense or am I misunderstanding?
> > >
> > > Thanks,
> > >
> > > Roger
> > >
> > > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
> > >  wrote:
> > >
> > > > Hello,
> > > >
> > > > This is a discussion thread for
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > .
> > > >
> > > > The KIP proposes extending Kafka transaction support (that already
> uses
> > > 2PC
> > > > under the hood) to enable atomicity of dual writes to Kafka and an
> > > external
> > > > database, and helps to fix a long standing Flink issue.
> > > >
> > > > An example of code that uses the dual write recipe with JDBC and
> should
> > > > work for most SQL databases is here
> > > > 

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2130

2023-08-22 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 304607 lines...]
Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testUpdateExistingPartitions() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testUpdateExistingPartitions() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testEmptyWrite() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testEmptyWrite() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testExistingKRaftControllerClaim() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testExistingKRaftControllerClaim() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testReadAndWriteProducerId() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testReadAndWriteProducerId() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testMigrateTopicConfigs() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testMigrateTopicConfigs() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testNonIncreasingKRaftEpoch() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testNonIncreasingKRaftEpoch() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testMigrateEmptyZk() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testMigrateEmptyZk() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testTopicAndBrokerConfigsMigrationWithSnapshots() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testTopicAndBrokerConfigsMigrationWithSnapshots() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testClaimAndReleaseExistingController() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testClaimAndReleaseExistingController() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testClaimAbsentController() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testClaimAbsentController() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testIdempotentCreateTopics() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testIdempotentCreateTopics() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testCreateNewTopic() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testCreateNewTopic() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testUpdateExistingTopicWithNewAndChangedPartitions() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZkMigrationClientTest > 
testUpdateExistingTopicWithNewAndChangedPartitions() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testZNodeChangeHandlerForDataChange() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testZNodeChangeHandlerForDataChange() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testZooKeeperSessionStateMetric() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testZooKeeperSessionStateMetric() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testExceptionInBeforeInitializingSession() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testExceptionInBeforeInitializingSession() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testGetChildrenExistingZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testGetChildrenExistingZNode() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testConnection() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testConnection() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testZNodeChangeHandlerForCreation() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testZNodeChangeHandlerForCreation() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testGetAclExistingZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testGetAclExistingZNode() PASSED

Gradle Test Run :core:test > Gradle Test 

[jira] [Created] (KAFKA-15393) MirrorMaker2 integration tests are shutting down uncleanly

2023-08-22 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15393:
---

 Summary: MirrorMaker2 integration tests are shutting down uncleanly
 Key: KAFKA-15393
 URL: https://issues.apache.org/jira/browse/KAFKA-15393
 Project: Kafka
  Issue Type: Test
  Components: mirrormaker
Reporter: Greg Harris
Assignee: Greg Harris


The MirrorConnectorsBaseIntegrationTest and it's derived test classes often 
shut down uncleanly. An unclean shutdown takes longer than a clean shutdown, 
because the shutdown must wait for timeouts to elapse.

It appears that during the teardown, the MirrorConnectorsBaseIntegrationTest 
deletes all of the topics on the EmbeddedKafkaCluster. This causes extremely 
poor behavior for the connect worker (which is unable to access internal 
topics) and for the connectors (which get stuck on their final offset commit 
refreshing metadata and then get cancelled).

The EmbeddedKafkaCluster is not reused for the next test, so cleaning the 
topics is unnecessary.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-22 Thread Artem Livshits
Hi Roger, Arjun,

Thank you for the questions.
> It looks like the application must have stable transactional ids over
time?

The transactional id should uniquely identify a producer instance and needs
to be stable across the restarts.  If the transactional id is not stable
across restarts, then zombie messages from a previous incarnation of the
producer may violate atomicity.  If there are 2 producer instances
concurrently producing data with the same transactional id, they are going
to constantly fence each other and most likely make little or no progress.

The name might be a little bit confusing as it may be mistaken for a
transaction id / TID that uniquely identifies every transaction.  The name
and the semantics were defined in the original exactly-once-semantics (EoS)
proposal (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging)
and KIP-939 just build on top of that.

> I'm curious to understand what happens if the producer dies, and does not
come up and recover the pending transaction within the transaction timeout
interval.

If the producer / application never comes back, the transaction will remain
in prepared (a.k.a. "in-doubt") state until an operator forcefully
terminates the transaction.  That's why there is a new ACL is defined in
this proposal -- this functionality should only provided to applications
that implement proper recovery logic.

-Artem

On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish 
wrote:

> Hello Artem,
>
> Thanks for the KIP.
>
> I have the same question as Roger on concurrent writes, and an additional
> one on consumer behavior. Typically, transactions will timeout if not
> committed within some time interval. With the proposed changes in this KIP,
> consumers cannot consume past the ongoing transaction. I'm curious to
> understand what happens if the producer dies, and does not come up and
> recover the pending transaction within the transaction timeout interval. Or
> are we saying that when used in this 2PC context, we should configure these
> transaction timeouts to very large durations?
>
> Thanks in advance!
>
> Best,
> Arjun
>
>
> On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover 
> wrote:
>
> > Hi Artem,
> >
> > Thanks for writing this KIP.  Can you clarify the requirements a bit more
> > for managing transaction state?  It looks like the application must have
> > stable transactional ids over time?   What is the granularity of those
> ids
> > and producers?  Say the application is a multi-threaded Java web server,
> > can/should all the concurrent threads share a transactional id and
> > producer?  That doesn't seem right to me unless the application is using
> > global DB locks that serialize all requests.  Instead, if the application
> > uses row-level DB locks, there could be multiple, concurrent, independent
> > txns happening in the same JVM so it seems like the granularity managing
> > transactional ids and txn state needs to line up with granularity of the
> DB
> > locking.
> >
> > Does that make sense or am I misunderstanding?
> >
> > Thanks,
> >
> > Roger
> >
> > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
> >  wrote:
> >
> > > Hello,
> > >
> > > This is a discussion thread for
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > .
> > >
> > > The KIP proposes extending Kafka transaction support (that already uses
> > 2PC
> > > under the hood) to enable atomicity of dual writes to Kafka and an
> > external
> > > database, and helps to fix a long standing Flink issue.
> > >
> > > An example of code that uses the dual write recipe with JDBC and should
> > > work for most SQL databases is here
> > > https://github.com/apache/kafka/pull/14231.
> > >
> > > The FLIP for the sister fix in Flink is here
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > >
> > > -Artem
> > >
> >
>


[jira] [Created] (KAFKA-15392) RestServer starts but does not stop ServletContextHandler

2023-08-22 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15392:
---

 Summary: RestServer starts but does not stop ServletContextHandler
 Key: KAFKA-15392
 URL: https://issues.apache.org/jira/browse/KAFKA-15392
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Greg Harris
Assignee: Greg Harris


Due to the initialization order of the connect RestServer and Herder, the jetty 
Server is started before the ServletContextHandler instances are installed. 
This causes jetty to consider them "unmanaged" and thus will not call the 
start() and stop() lifecycle on our behalf.

RestServer#initializeResources already explicitly calls start() for these 
unmanaged resources, but there is no accompanying stop() call, so the resources 
never enter the STOPPED state.

The jetty server has one more operation after stopping: destroy(), which 
asserts that resources are already stopped. If the jetty server is ever 
destroyed, this exception will be thrown:
java.lang.IllegalStateException: !STOPPED
at 
org.eclipse.jetty.server.handler.HandlerWrapper.destroy(HandlerWrapper.java:140)
at o.a.k.connect.runtime.rest.RestServer.stop(RestServer.java:361)
Fortunately, destroy() is currently only called when an error has already 
occurred, so this IllegalStateException is never thrown on happy-path 
execution. Instead, if RestServer shutdown encounters an error (such as 
exceeding the GRACEFUL_SHUTDOWN_TIMEOUT and timing out) the other error will be 
shadowed by the IllegalStateException.

Rather than only calling destroy() on failure and shadowing the error, 
destroy() should always be called and it's errors reported separately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2129

2023-08-22 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 307524 lines...]
Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testUpdateExistingPartitions() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testUpdateExistingPartitions() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testEmptyWrite() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testEmptyWrite() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testExistingKRaftControllerClaim() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testExistingKRaftControllerClaim() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testReadAndWriteProducerId() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testReadAndWriteProducerId() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testMigrateTopicConfigs() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testMigrateTopicConfigs() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testNonIncreasingKRaftEpoch() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testNonIncreasingKRaftEpoch() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testMigrateEmptyZk() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testMigrateEmptyZk() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testTopicAndBrokerConfigsMigrationWithSnapshots() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testTopicAndBrokerConfigsMigrationWithSnapshots() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testClaimAndReleaseExistingController() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testClaimAndReleaseExistingController() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testClaimAbsentController() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testClaimAbsentController() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testIdempotentCreateTopics() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testIdempotentCreateTopics() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testCreateNewTopic() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testCreateNewTopic() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testUpdateExistingTopicWithNewAndChangedPartitions() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZkMigrationClientTest > 
testUpdateExistingTopicWithNewAndChangedPartitions() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testZNodeChangeHandlerForDataChange() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testZNodeChangeHandlerForDataChange() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testZooKeeperSessionStateMetric() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testZooKeeperSessionStateMetric() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testExceptionInBeforeInitializingSession() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testExceptionInBeforeInitializingSession() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testGetChildrenExistingZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testGetChildrenExistingZNode() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testConnection() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testConnection() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testZNodeChangeHandlerForCreation() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testZNodeChangeHandlerForCreation() PASSED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testGetAclExistingZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 90 > ZooKeeperClientTest > 
testGetAclExistingZNode() PASSED

Gradle Test Run :core:test > Gradle Test 

[jira] [Resolved] (KAFKA-15213) Provide the exact offset to QuorumController.replay

2023-08-22 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-15213.
--
Fix Version/s: 3.6.0
   Resolution: Fixed

> Provide the exact offset to QuorumController.replay
> ---
>
> Key: KAFKA-15213
> URL: https://issues.apache.org/jira/browse/KAFKA-15213
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.6.0
>
>
> Provide the exact offset to QuorumController.replay so that we can implement 
> metadata transactions. We need this so that we can know the offset where the 
> records will be applied before we apply them in QuorumControllers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-910: Update Source offsets for Source Connectors without producing records

2023-08-22 Thread Yash Mayya
Hi Sagar,

Thanks for the updates and apologies for the delayed response.

> Hmm the question is how do you qualify a
> partition as stale or old? Let's say a connector
>  has implemented updateOffsets and for a certain
> partition for which no records are received then it
> will update it's offsets. So technically that offset can't
> be termed as stale anymore

The "staleness" was not from the point of view of the offsets, but the
source partition itself. For instance, if a database source connector is
monitoring a number of tables (each modelled as a source partition) and
detects that a table has been dropped, it might be nice to allow the
connector to wipe the offset for that source partition. Similarly, a file
based source connector that is reading from multiple files in a directory
might want to wipe the offsets for a source file that has been deleted.

> Even though I can't think of a side effect at this
> point to disallow offset deletion via this method,
> my opinion is to use a proper mechanism like the
> ones introduced in KIP-875 to delete offsets. Moreover,
> if I also consider the option presented in point #2 , for
> simplicity sake it seems better to not add this feature at
> this point

The KIP-875 APIs would allow users / cluster administrators to manually
wipe offsets externally. However, for the cases that I've outlined above,
it would be additional toil for the operator and something that would be
more suitable to be done by the connector itself. Also, I'm not sure if I'm
missing something here, but I don't get why allowing tombstone offsets
would add any complexity here?

> I get the point now. I can't think of cases where
> updating offsets would be needed.

Given that we're disallowing updating offsets for source partitions whose
offsets are about to be committed (or removing such source partitions
altogether), I'm wondering what purpose does the "offsets" parameter in the
newly proposed SourceTask::updateOffsets method serve?

Thanks,
Yash

On Fri, Jul 28, 2023 at 1:41 PM Sagar  wrote:

> Hey Yash,
>
> Thanks for your comments.
>
> 1) Hmm the question is how do you qualify a partition as stale or old?
> Let's say a connector has implemented updateOffsets and for a certain
> partition for which no records are received then it will update it's
> offsets. So technically that offset can't be termed as stale anymore. Even
> though I can't think of a side effect at this point to disallow offset
> deletion via this method, my opinion is to use a proper mechanism like the
> ones introduced in KIP-875 to delete offsets. Moreover, if I also consider
> the option presented in point #2 , for simplicity sake it seems better to
> not add this feature at this point. If we feel it's really needed and users
> are requesting it, we can add support for it later on.
>
> 2) I get the point now. I can't think of cases where updating offsets would
> be needed. As with point #1, we can always add it back if needed later on.
> For now, I have removed that part from the KIP.
>
> 3) Yes, because the offset commit happens on a different thread, ordering
> guarantees might be harder to ensure if we do it from the other thread. The
> current mechanism proposed, even though gets invoked multiple times, keeps
> things simpler to reason about.
>
> Let me know how things look now. If it's all looking ok, I would go ahead
> and create a Vote thread for the same.
>
> Thanks!
> Sagar.
>
> On Tue, Jul 25, 2023 at 5:15 PM Yash Mayya  wrote:
>
> > Hi Sagar,
> >
> > Thanks for the updates. I had a few more follow up questions:
> >
> > > I have added that a better way of doing that would be
> > > via KIP-875. Also, I didn't want to include any mechamisms
> > > for users to meddle with the offsets topic. Allowing tombstone
> > > records via this method would be akin to publishing tombstone
> > > records directly to the offsets topic which is not recommended
> > > generally.
> >
> > KIP-875 would allow a way for cluster administrators and / or users to do
> > so manually externally whereas allowing tombstones in
> > SourceTask::updateOffsets would enable connectors to clean up offsets for
> > old / stale partitions without user intervention right? I'm not sure I
> > follow what you mean by "I didn't want to include any mechamisms for
> users
> > to meddle with the offsets topic" here? Furthermore, I'm not sure why
> > publishing tombstone records directly to the offsets topic would not be
> > recommended? Isn't that currently the only way to manually clean up
> offsets
> > for a source connector?
> >
> > > It could be useful in a scenario where the offset of a partition
> > > doesn't update for some period of time. In such cases, the
> > > connector can do some kind of state tracking and update the
> > > offsets after the time period elapses.
> >
> > I'm not sure I follow? In this case, won't the offsets argument passed
> > to SourceTask::updateOffsets *not *contain the source partition which
> > hasn't had an update for a 

Requesting permissions to contribute to Apache Kafka

2023-08-22 Thread Animesh Kumar
Hi Team,
Please provide access to contribute to Apache Kafka
JIRA id -- akanimesh7
Wiki Id -- akanimesh7
-- 
Animesh Kumar
8120004556


Re: [DISCUSS] KIP-970: Deprecate and remove Connect's redundant task configurations endpoint

2023-08-22 Thread Chris Egerton
Hi Yash,

Thanks for driving this, and for putting out a well-written KIP. LGTM!

Cheers,

Chris

On Tue, Aug 22, 2023 at 6:13 AM Yash Mayya  wrote:

> Hi all,
>
> I'd like to start a discussion thread for this KIP -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-970%3A+Deprecate+and+remove+Connect%27s+redundant+task+configurations+endpoint
> .
>
> It proposes the deprecation and eventual removal of Kafka Connect's
> redundant task configurations endpoint.
>
> Thanks,
> Yash
>


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2128

2023-08-22 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 306231 lines...]

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testConnectionViaNettyClient() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testDeleteNonExistentZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testDeleteNonExistentZNode() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testExistsExistingZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testExistsExistingZNode() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testZooKeeperStateChangeRateMetrics() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testZooKeeperStateChangeRateMetrics() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testZNodeChangeHandlerForDeletion() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testZNodeChangeHandlerForDeletion() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testGetAclNonExistentZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testGetAclNonExistentZNode() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testStateChangeHandlerForAuthFailure() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > ZooKeeperClientTest > 
testStateChangeHandlerForAuthFailure() PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > 
AllocateProducerIdsRequestTest > testAllocateProducersIdSentToController() > 
[1] Type=Raft-Isolated, Name=testAllocateProducersIdSentToController, 
MetadataVersion=3.6-IV1, Security=PLAINTEXT STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testConnectionLossRequestTermination() PASSED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testExistsNonExistentZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testExistsNonExistentZNode() PASSED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testGetDataNonExistentZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testGetDataNonExistentZNode() PASSED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testConnectionTimeout() STARTED

Gradle Test Run :core:test > Gradle Test Executor 93 > 
AllocateProducerIdsRequestTest > testAllocateProducersIdSentToController() > 
[1] Type=Raft-Isolated, Name=testAllocateProducersIdSentToController, 
MetadataVersion=3.6-IV1, Security=PLAINTEXT PASSED

Gradle Test Run :core:test > Gradle Test Executor 93 > 
AllocateProducerIdsRequestTest > testAllocateProducersIdSentToNonController() > 
[1] Type=Raft-Isolated, Name=testAllocateProducersIdSentToNonController, 
MetadataVersion=3.6-IV1, Security=PLAINTEXT STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testConnectionTimeout() PASSED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testBlockOnRequestCompletionFromStateChangeHandler() STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testBlockOnRequestCompletionFromStateChangeHandler() PASSED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testUnresolvableConnectString() STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testUnresolvableConnectString() PASSED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testGetChildrenNonExistentZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testGetChildrenNonExistentZNode() PASSED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testPipelinedGetData() STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testPipelinedGetData() PASSED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testZNodeChildChangeHandlerForChildChange() STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testZNodeChildChangeHandlerForChildChange() PASSED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testGetChildrenExistingZNodeWithChildren() STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testGetChildrenExistingZNodeWithChildren() PASSED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest > 
testSetDataExistingZNode() STARTED

Gradle Test Run :core:test > Gradle Test Executor 95 > ZooKeeperClientTest 

[jira] [Created] (KAFKA-15391) Delete topic may lead to directory offline

2023-08-22 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15391:


 Summary: Delete topic may lead to directory offline
 Key: KAFKA-15391
 URL: https://issues.apache.org/jira/browse/KAFKA-15391
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Divij Vaidya
 Fix For: 3.6.0


This is an edge case where the entire log directory is marked offline when we 
delete a topic. This symptoms of this scenario is characterised by the 
following logs:


{noformat}
[2023-08-14 09:22:12,600] ERROR Uncaught exception in scheduled task 
'flush-log' (org.apache.kafka.server.util.KafkaScheduler:152)  
org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log 
for test-0 in dir /tmp/kafka-15093588566723278510 with offset 221 (exclusive) 
and recovery point 221 Caused by: java.nio.file.NoSuchFileException: 
/tmp/kafka-15093588566723278510/test-0{noformat}
The above log is followed by logs such as:
{noformat}
[2023-08-14 09:22:12,601] ERROR Uncaught exception in scheduled task 
'flush-log' 
(org.apache.kafka.server.util.KafkaScheduler:152)org.apache.kafka.common.errors.KafkaStorageException:
 The log dir /tmp/kafka-15093588566723278510 is already offline due to a 
previous IO exception.{noformat}

The below sequence of events demonstrate the scenario where this bug manifests
1.  On the broker, partition lock is acquired and UnifiedLog.roll() is called 
which schedules an async call for 
flushUptoOffsetExclusive(). The roll may be called due to segment rotation time 
or size.
2. Admin client calls deleteTopic
3. On the broker, LogManager.asyncDelete() is called which will call 
UnifiedLog.renameDir()
4. The directory for the partition is successfully renamed with a "delete" 
suffix.
5. The async task scheduled in step 1 (flushUptoOffsetExclusive) starts 
executing. It tries to call localLog.flush() without acquiring a partition 
lock. 
6. LocalLog calls Utils.flushDir() which fails with an IOException.
7. On IOException, log directory is added to logDirFailureChannel
8. Any new interaction with this logDir fails and a log line is printed such as 
"The log dir $logDir is already offline due to a previous IO exception"
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [REVIEW REQUEST] Move ReassignPartitionsCommandArgsTest to java

2023-08-22 Thread Николай Ижиков
Hello.

Please, join the simple review)
We have few steps left to completely rewrite ReassignPartitionsCommand in java.

> 17 авг. 2023 г., в 17:16, Николай Ижиков  написал(а):
> 
> Hello.
> 
> I’m working on [1].
> The goal of ticket is to rewire `ReassignPartitionCommand` in java.
> 
> The PR that moves whole command is pretty big so it makes sense to split it.
> I prepared the PR [2] that moves single test 
> (ReassignPartitionsCommandArgsTest) to java.
> 
> It relatively small and simple(touches only 3 files):
> 
> To review - https://github.com/apache/kafka/pull/14217
> Big PR  - https://github.com/apache/kafka/pull/13247
> 
> Please, review.
> 
> [1] https://issues.apache.org/jira/browse/KAFKA-14595
> [2] https://github.com/apache/kafka/pull/14217



[DISCUSS] KIP-970: Deprecate and remove Connect's redundant task configurations endpoint

2023-08-22 Thread Yash Mayya
Hi all,

I'd like to start a discussion thread for this KIP -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-970%3A+Deprecate+and+remove+Connect%27s+redundant+task+configurations+endpoint
.

It proposes the deprecation and eventual removal of Kafka Connect's
redundant task configurations endpoint.

Thanks,
Yash


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-22 Thread Arjun Satish
Hello Artem,

Thanks for the KIP.

I have the same question as Roger on concurrent writes, and an additional
one on consumer behavior. Typically, transactions will timeout if not
committed within some time interval. With the proposed changes in this KIP,
consumers cannot consume past the ongoing transaction. I'm curious to
understand what happens if the producer dies, and does not come up and
recover the pending transaction within the transaction timeout interval. Or
are we saying that when used in this 2PC context, we should configure these
transaction timeouts to very large durations?

Thanks in advance!

Best,
Arjun


On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover  wrote:

> Hi Artem,
>
> Thanks for writing this KIP.  Can you clarify the requirements a bit more
> for managing transaction state?  It looks like the application must have
> stable transactional ids over time?   What is the granularity of those ids
> and producers?  Say the application is a multi-threaded Java web server,
> can/should all the concurrent threads share a transactional id and
> producer?  That doesn't seem right to me unless the application is using
> global DB locks that serialize all requests.  Instead, if the application
> uses row-level DB locks, there could be multiple, concurrent, independent
> txns happening in the same JVM so it seems like the granularity managing
> transactional ids and txn state needs to line up with granularity of the DB
> locking.
>
> Does that make sense or am I misunderstanding?
>
> Thanks,
>
> Roger
>
> On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
>  wrote:
>
> > Hello,
> >
> > This is a discussion thread for
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > .
> >
> > The KIP proposes extending Kafka transaction support (that already uses
> 2PC
> > under the hood) to enable atomicity of dual writes to Kafka and an
> external
> > database, and helps to fix a long standing Flink issue.
> >
> > An example of code that uses the dual write recipe with JDBC and should
> > work for most SQL databases is here
> > https://github.com/apache/kafka/pull/14231.
> >
> > The FLIP for the sister fix in Flink is here
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> >
> > -Artem
> >
>