Re: [DISCUSS] KIP-794: Strictly Uniform Sticky Partitioner
Hi Luke, Thank you for your feedback. I've updated the KIP with your suggestions. 1. Updated with a better example. 2. I removed the reference to ClassicDefaultPartitioner, it was probably confusing. 3. The logic doesn't rely on checking batches, I've updated the proposal to make it more explicit. 4. The primary issue (uneven distribution) is described in the linked jira, copied an example from jira into the KIP as well. -Artem On Thu, Nov 4, 2021 at 8:34 PM Luke Chen wrote: > Hi Artem, > Thanks for the KIP! And thanks for reminding me to complete KIP-782, soon. > :) > > Back to the KIP, I have some comments: > 1. You proposed to have a new config: "partitioner.sticky.batch.size", but > I can't see how we're going to use it to make the partitioner better. > Please explain more in KIP (with an example will be better as suggestion > (4)) > 2. In the "Proposed change" section, you take an example to use > "ClassicDefaultPartitioner", is that referring to the current default > sticky partitioner? I think it'd better you name your proposed partition > with a different name for distinguish between the default one and new one. > (Although after implementation, we are going to just use the same name) > 3. So, if my understanding is correct, you're going to have a "batch" > switch, and before the in-flight is full, it's disabled. Otherwise, we'll > enable it. Is that right? Sorry, I don't see any advantage of having this > batch switch. Could you explain more? > 4. I think it should be more clear if you can have a clear real example in > the motivation section, to describe what issue we faced using current > sticky partitioner. And in proposed changes section, using the same > example, to describe more detail about how you fix this issue with your > way. > > Thank you. > Luke > > On Fri, Nov 5, 2021 at 1:38 AM Artem Livshits > wrote: > > > Hello, > > > > This is the discussion thread for > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > > . > > > > The proposal is a bug fix for > > https://issues.apache.org/jira/browse/KAFKA-10888, but it does include a > > client config change, therefore we have a KIP to discuss. > > > > -Artem > > >
Re: [DISCUSS] KIP-778 KRaft Upgrades
Hi, David, Thanks for the reply. 16. My first concern is that the KIP picks up meta.version inconsistently during the deployment. If a new cluster is started, we pick up the highest version. If we upgrade, we leave the feature version unchanged. Intuitively, it seems that independent of how a cluster is deployed, we should always pick the same feature version. I think we need to think this through in this KIP. My second concern is that as a particular version matures, it's inconvenient for a user to manually upgrade every feature version. As long as we have a path to achieve that in the future, we don't need to address that in this KIP. 21. "./kafka-features.sh delete": Deleting a feature seems a bit weird since the logic is always there. Would it be better to use disable? Jun On Fri, Nov 5, 2021 at 8:11 AM David Arthur wrote: > Colin and Jun, thanks for the additional comments! > > Colin: > > > We've been talking about having an automated RPC compatibility checker > > Do we have a way to mark fields in schemas as deprecated? It can stay in > the RPC, it just complicates the logic a bit. > > > It would be nice if the active controller could validate that a majority > of the quorum could use the proposed metadata.version. The active > controller should have this information, right? If we don't have recent > information from a quorum of voters, we wouldn't be active. > > I believe we should have this information from the ApiVersionsResponse. It > would be good to do this validation to avoid a situation where a > quorum leader can't be elected due to unprocessable records. > > > Do we need delete as a command separate from downgrade? > > I think from an operator's perspective, it is nice to distinguish between > changing a feature flag and unsetting it. It might be surprising to an > operator to see the flag's version set to nothing when they requested the > downgrade to version 0 (or less). > > > it seems like we should spell out that metadata.version begins at 1 in > KRaft clusters > > I added this text: > > Introduce an IBP version to indicate the lowest software version that > > supports *metadata.version*. Below this IBP, the *metadata.version* is > > undefined and will not be examined. At or above this IBP, the > > *metadata.version* must be *0* for ZooKeeper clusters and will be > > initialized as *1* for KRaft clusters. > > > > We probably also want an RPC implemented by both brokers and controllers > that will reveal the min and max supported versions for each feature level > supported by the server > > This is available in ApiVersionsResponse (we include the server's supported > features as well as the cluster's finalized features) > > > > Jun: > > 12. I've updated the KIP with AdminClient changes > > 14. You're right, it looks like I missed a few sections regarding snapshot > generation. I've corrected it > > 16. This feels more like an enhancement to KIP-584. I agree it could be > useful, but perhaps we could address it separately from KRaft upgrades? > > 20. Indeed snapshots are not strictly necessary during an upgrade, I've > reworded this > > > Thanks! > David > > > On Thu, Nov 4, 2021 at 6:51 PM Jun Rao wrote: > > > Hi, David, Jose and Colin, > > > > Thanks for the reply. A few more comments. > > > > 12. It seems that we haven't updated the AdminClient accordingly? > > > > 14. "Metadata snapshot is generated and sent to the other inactive > > controllers and to brokers". I thought we wanted each broker to generate > > its own snapshot independently? If only the controller generates the > > snapshot, how do we force other brokers to pick it up? > > > > 16. If a feature version is new, one may not want to enable it > immediately > > after the cluster is upgraded. However, if a feature version has been > > stable, requiring every user to run a command to upgrade to that version > > seems inconvenient. One way to improve this is for each feature to define > > one version as the default. Then, when we upgrade a cluster, we will > > automatically upgrade the feature to the default version. An admin could > > use the tool to upgrade to a version higher than the default. > > > > 20. "The quorum controller can assist with this process by generating a > > metadata snapshot after a metadata.version increase has been committed to > > the metadata log. This snapshot will be a convenient way to let broker > and > > controller components rebuild their entire in-memory state following an > > upgrade." The new version of the software could read both the new and the > > old version. Is generating a new snapshot during upgrade needed? > > > > Jun > > > > > > On Wed, Nov 3, 2021 at 5:42 PM Colin McCabe wrote: > > > > > On Tue, Oct 12, 2021, at 10:34, Jun Rao wrote: > > > > Hi, David, > > > > > > > > One more comment. > > > > > > > > 16. The main reason why KIP-584 requires finalizing a feature > manually > > is > > > > that in the ZK world, the controller doesn't know all brokers in a > > > clu
Re: [DISCUSS] KIP-791: Add Record Metadata to State Store Context
Thanks for the KIP, Patrick! It looks like you addressed Guozhang's and Bruno's very good feeback, and I like the result. The example especially helps clarify how this property might be useful. I'm in favor of this proposal. Thanks, -John On Fri, 2021-11-05 at 12:03 +0100, Bruno Cadonna wrote: > Hi Patrick, > > Thank you for the KIP! > > - Maybe some more details in the motivation would help to better > understand the background of the KIP. Currently, it is hard to judge > whether record metadata should be exposed or not. Can you maybe give an > example? > > - Could you please replace RYW abbreviation with read-your-writes (at > least that is my guess about the meaning of RYW)? > > Best, > Bruno > > > > On 03.11.21 22:43, Guozhang Wang wrote: > > Thanks Patrick, > > > > I looked at the KIP and it looks good to me overall. I think we need to > > double check whether the record metadata reflect the "last processed > > record" or the "currently processed record" where the latter may not have > > been completely processed. In `ProcessorContext#recordMetadata` it returns > > the latter, but that may not be the preferred case if you want to build the > > consistency reasoning on top of. > > > > Otherwise, LGTM. > > > > > > Guozhang > > > > On Wed, Nov 3, 2021 at 1:44 PM Patrick Stuedi > > wrote: > > > > > Hi everyone, > > > > > > I would like to start the discussion for KIP-791: Add Record Metadata to > > > State Store Context. > > > > > > The KIP can be found here: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-791:+Add+Record+Metadata+to+State+Store+Context > > > > > > Any feedback will be highly appreciated. > > > > > > Many thanks, > > > Patrick > > > > > > >
Re: New Kafka Consumer : unknown member id
Hello Luke i have build a new kafka environment with kafka 2.8.0 the consumer is a new consumer set up to this environment is throwing the below error... the old consumers for the same applications for the same environment -2.8.0 is working fine.. . could you please advise 2021-11-02 12:25:24 DEBUG AbstractCoordinator:557 - [Consumer clientId=, groupId=] Attempt to join group failed due to unknown member id. On Fri, Oct 29, 2021 at 7:36 AM Luke Chen wrote: > Hi, > Which version of kafka client are you using? > I can't find this error message in the source code. > When googling this error message, it showed the error is in Kafka v0.9. > > Could you try to use the V3.0.0 and see if that issue still exist? > > Thank you. > Luke > > On Thu, Oct 28, 2021 at 11:15 PM Kafka Life > wrote: > > > Dear Kafka Experts > > > > We have set up a group.id (consumer ) = YYY > > But when tried to connect to kafka instance : i get this error message. I > > am sure this consumer (group id does not exist in kafka) .We user plain > > text protocol to connect to kafka 2.8.0. Please suggest how to resolve > this > > issue. > > > > DEBUG AbstractCoordinator:557 - [Consumer clientId=X, > groupId=YYY] > > Attempt to join group failed due to unknown member id. > > >
[jira] [Created] (KAFKA-13435) Group won't consume partitions added after static member restart
Ryan Leslie created KAFKA-13435: --- Summary: Group won't consume partitions added after static member restart Key: KAFKA-13435 URL: https://issues.apache.org/jira/browse/KAFKA-13435 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.7.0 Reporter: Ryan Leslie When using consumer groups with static membership, if the consumer marked as leader has restarted, then metadata changes such as partition increase are not triggering expected rebalances. To reproduce this issue, simply: # Create a static consumer subscribed to a single topic # Close the consumer and create a new one with the same group instance id # Increase partitions for the topic # Observe that no rebalance occurs and the new partitions are not assigned I have only tested this in 2.7, but it may apply to newer versions as well. h3. Analysis In _ConsumerCoordinator_, one responsibility of the leader consumer is to track metadata and trigger a rebalance if there are changes such as new partitions added: [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793] {code:java} if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) { ... requestRejoinIfNecessary(reason); return true; } {code} Note that _assignmentSnapshot_ is currently only set if the consumer is the leader: [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353] {code:java} // Only the leader is responsible for monitoring for metadata changes (i.e. partition changes) if (!isLeader) assignmentSnapshot = null; {code} And _isLeader_ is only true after an assignment is performed during a rebalance: [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634] That is, when a consumer group forms, exactly one consumer in the group should have _isLeader == True_ and be responsible for triggering rebalances on metadata changes. However, in the case of static membership, if the leader has been restarted and rejoined the group, the group essentially no longer has a current leader. Even though the metadata changes are fetched, no rebalance will be triggered. That is, _isLeader_ will be false for all members. This issue does not resolve until after an actual group change that causes a proper rebalance. In order to safely make a partition increase when using static membership, consumers must be stopped and have timed out, or forcibly removed with _AdminClient.removeMembersFromConsumerGroup()_. Correcting this in the client probably also requires help from the broker. Currently, when a static consumer that is leader is restarted, the coordinator does recognize the change: e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted {noformat} [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test with unknown member id rejoins, assigning new member id 353WV-1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2- 6ebf-47da-95ef-c54fef17ab74, while old member id 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff will be removed. ( kafka.coordinator.group.GroupCoordinator){noformat} However, it does not attempt to update the leader id since this isn't a new rebalance, and JOIN_GROUP will continue returning the now stale member id as leader: {noformat} 2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer instanceId=353WV-1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, groupId=ryan_test] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=40, protocolType='consumer', protocolName='range', leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff', memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74', members=[]){noformat} This means that it's not easy for any particular restarted member to identify that it should consider itself leader and handle metadata changes. There is reference to the difficulty of leader restarts in KAFKA-7728 but the focus seemed mainly on avoiding needless rebalances for static members. That goal was accomplished, but this issue seems to be a side effect of both not rebalancing AND not having the rejoined member re-claim its leadership status. Also, I have not verified if it's strictly related or valid, but noticed this ticket has been opened too: K
[jira] [Created] (KAFKA-13434) Add a public API for AbstractCoordinatos
Hector G created KAFKA-13434: Summary: Add a public API for AbstractCoordinatos Key: KAFKA-13434 URL: https://issues.apache.org/jira/browse/KAFKA-13434 Project: Kafka Issue Type: Improvement Components: clients Reporter: Hector G Assignee: Hector G KIP-784: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-784%3A+Add+public+APIs+for+AbstractCoordinator] The AbstractCoordinator should have a companion public interface that is part of Kafka's public API, so backwards compatibility can be maintained in future versions of the client libraries -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] KIP-778 KRaft Upgrades
Colin and Jun, thanks for the additional comments! Colin: > We've been talking about having an automated RPC compatibility checker Do we have a way to mark fields in schemas as deprecated? It can stay in the RPC, it just complicates the logic a bit. > It would be nice if the active controller could validate that a majority of the quorum could use the proposed metadata.version. The active controller should have this information, right? If we don't have recent information from a quorum of voters, we wouldn't be active. I believe we should have this information from the ApiVersionsResponse. It would be good to do this validation to avoid a situation where a quorum leader can't be elected due to unprocessable records. > Do we need delete as a command separate from downgrade? I think from an operator's perspective, it is nice to distinguish between changing a feature flag and unsetting it. It might be surprising to an operator to see the flag's version set to nothing when they requested the downgrade to version 0 (or less). > it seems like we should spell out that metadata.version begins at 1 in KRaft clusters I added this text: Introduce an IBP version to indicate the lowest software version that > supports *metadata.version*. Below this IBP, the *metadata.version* is > undefined and will not be examined. At or above this IBP, the > *metadata.version* must be *0* for ZooKeeper clusters and will be > initialized as *1* for KRaft clusters. > We probably also want an RPC implemented by both brokers and controllers that will reveal the min and max supported versions for each feature level supported by the server This is available in ApiVersionsResponse (we include the server's supported features as well as the cluster's finalized features) Jun: 12. I've updated the KIP with AdminClient changes 14. You're right, it looks like I missed a few sections regarding snapshot generation. I've corrected it 16. This feels more like an enhancement to KIP-584. I agree it could be useful, but perhaps we could address it separately from KRaft upgrades? 20. Indeed snapshots are not strictly necessary during an upgrade, I've reworded this Thanks! David On Thu, Nov 4, 2021 at 6:51 PM Jun Rao wrote: > Hi, David, Jose and Colin, > > Thanks for the reply. A few more comments. > > 12. It seems that we haven't updated the AdminClient accordingly? > > 14. "Metadata snapshot is generated and sent to the other inactive > controllers and to brokers". I thought we wanted each broker to generate > its own snapshot independently? If only the controller generates the > snapshot, how do we force other brokers to pick it up? > > 16. If a feature version is new, one may not want to enable it immediately > after the cluster is upgraded. However, if a feature version has been > stable, requiring every user to run a command to upgrade to that version > seems inconvenient. One way to improve this is for each feature to define > one version as the default. Then, when we upgrade a cluster, we will > automatically upgrade the feature to the default version. An admin could > use the tool to upgrade to a version higher than the default. > > 20. "The quorum controller can assist with this process by generating a > metadata snapshot after a metadata.version increase has been committed to > the metadata log. This snapshot will be a convenient way to let broker and > controller components rebuild their entire in-memory state following an > upgrade." The new version of the software could read both the new and the > old version. Is generating a new snapshot during upgrade needed? > > Jun > > > On Wed, Nov 3, 2021 at 5:42 PM Colin McCabe wrote: > > > On Tue, Oct 12, 2021, at 10:34, Jun Rao wrote: > > > Hi, David, > > > > > > One more comment. > > > > > > 16. The main reason why KIP-584 requires finalizing a feature manually > is > > > that in the ZK world, the controller doesn't know all brokers in a > > cluster. > > > A broker temporarily down is not registered in ZK. in the KRaft world, > > the > > > controller keeps track of all brokers, including those that are > > temporarily > > > down. This makes it possible for the controller to automatically > > finalize a > > > feature---it's safe to do so when all brokers support that feature. > This > > > will make the upgrade process much simpler since no manual command is > > > required to turn on a new feature. Have we considered this? > > > > > > Thanks, > > > > > > Jun > > > > Hi Jun, > > > > I guess David commented on this point already, but I'll comment as well. > I > > always had the perception that users viewed rolls as potentially risky > and > > were looking for ways to reduce the risk. Not enabling features right > away > > after installing new software seems like one way to do that. If we had a > > feature to automatically upgrade during a roll, I'm not sure that I would > > recommend that people use it, because if something fails, it makes it > > harder to tell if the
RE: Re: [VOTE] KIP-714: Client Metrics and Observability
+1 We also have a lot of clients using our central Kafka cluster, and it would be great to have client metrics so we can provide end-to-end monitoring. Igor Buzatović Porsche Digital On 2021/11/01 20:19:20 J Rivers wrote: > +1 > > Thank you for the KIP! > > Our organization runs kafka at large scale in a multi-tenant configuration. > We actually have many other enterprises connecting up to our system to > retrieve stream data. These feeds vary greatly in volume and velocity. The > peak rates are a multiplicative factor of the nominal. There is extreme > skew in our datasets in a number of ways. > > We don't have time to work with every new internal/external client to tune > their feeds. They need to be able to take one of the many kafka clients and > go off to the races. > > Being able to retrieve client metrics would be invaluable here as it's hard > and time consuming to communicate out of the enterprise walls. > > This KIP is important to us to expand the use of our datasets internally > and outside the borders of the enterprise. Our clients like the performance > and data safeties related to the kafka connection. The observability has > been a problem... > > Jonathan Rivers > jrivers...@gmail.com > > > > > On Mon, Oct 18, 2021 at 11:56 PM Ryanne Dolan wrote: > > > -1 > > > > Ryanne > > > > On Mon, Oct 18, 2021, 4:30 AM Magnus Edenhill wrote: > > > > > Hi all, > > > > > > I'd like to start a vote on KIP-714. > > > https://cwiki.apache.org/confluence/x/2xRRCg > > > > > > Discussion thread: > > > https://www.mail-archive.com/dev@kafka.apache.org/msg119000.html > > > > > > Thanks, > > > Magnus > > > > > >
Re: [DISCUSS] KIP-779: Allow Source Tasks to Handle Producer Exceptions
Good morning, If there is no additional feedback, I am going to call a vote for this KIP on Monday. Knowles On Tue, Nov 2, 2021 at 10:00 AM Knowles Atchison Jr wrote: > Third time's the charm. > > I've added a getter for the RetryWithToleranceOperator to get the > ToleranceType. I've updated WorkerSourceTask to check this setting to see > if it is ToleranceType.ALL. > > Setting "errors.tolerance" to "all" solves both problems: > > 1. Use an existing configuration > 2. Moves the configuration back to the connector/task level instead of at > the connect worker level. > > I've updated the KIP and PR. > > Additional thoughts and feedback are welcome. > > Knowles > > On Mon, Nov 1, 2021 at 2:00 AM Arjun Satish > wrote: > >> Looks really nice. Thanks for the changes. Couple of suggestions: >> >> 1. Can we reuse any of the existing configs, instead of introducing a new >> one? I’m wondering if the error.tolerance configuration’s scope can be >> increased to include produce errors as well. That’ll help us keep number >> of >> configs in check. Effectively, if error.tolerance is set to all, then the >> behavior would be like how you describe the worker would ignore producer >> errors. >> >> 2. If we do choose to have a new config, could you please call out the >> possible values it can take in the kip? >> >> Thanks again! >> >> Best, >> >> >> On Fri, Oct 29, 2021 at 9:53 AM Knowles Atchison Jr < >> katchiso...@gmail.com> >> wrote: >> >> > Arjun, >> > >> > Thank you for your feedback, I have updated the KIP. >> > >> > This solution is more elegant than my original proposal; however, after >> > working on the implementation, we have now pushed the configuration from >> > the connector/task itself back to the connect worker. All tasks running >> on >> > the worker would share this ignore producer exception configuration >> flag. >> > This works for my use cases where I cannot envision setting this for >> only >> > one type of connector we have, but this does take the choice out of the >> > hands of the connector developer. I suppose that is for the best, in a >> > vacuum only the worker should have a say in how it handles message >> > production. >> > >> > Additional thoughts and feedback are welcome. >> > >> > Knowles >> > >> > On Thu, Oct 28, 2021 at 10:54 AM Arjun Satish >> > wrote: >> > >> > > Yes, that makes sense. And it fits in very nicely with the current >> error >> > > handling framework. >> > > >> > > On Thu, Oct 28, 2021 at 10:39 AM Knowles Atchison Jr < >> > > katchiso...@gmail.com> >> > > wrote: >> > > >> > > > That would work. I originally thought that it would be confusing to >> > > > overload that function when a Record that wasn't actually written, >> but >> > > > looking at SourceTask more closely, in commitRecord(SourceRecord, >> > > > RecordMetadata), the RecordMetadata is set to null in the event of a >> > > > filtered transformation so the framework is already doing this in a >> > > certain >> > > > regard. >> > > > >> > > > Knowles >> > > > >> > > > On Thu, Oct 28, 2021 at 10:29 AM Arjun Satish < >> arjun.sat...@gmail.com> >> > > > wrote: >> > > > >> > > > > To ack the message back to the source system, we already have a >> > > > > commitRecord method. Once the bad record is handled by skip/dlq, >> we >> > > could >> > > > > just call commitRecord() on it? >> > > > > >> > > > > On Thu, Oct 28, 2021 at 9:35 AM Knowles Atchison Jr < >> > > > katchiso...@gmail.com >> > > > > > >> > > > > wrote: >> > > > > >> > > > > > Hi Chris, >> > > > > > >> > > > > > Thank you for your reply! >> > > > > > >> > > > > > It is a clarity error regarding the javadoc. I am not >> operationally >> > > > > > familiar with all of the exceptions Kafka considers >> non-retriable, >> > > so I >> > > > > > pulled the list from Callback.java: >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/kafka/blob/1afe2a5190e9c98e38c84dc793f4303ea51bc19b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java#L35 >> > > > > > to be an illustrative example of the types of exceptions that >> would >> > > > kill >> > > > > > the connector outright. Any exception thrown during the producer >> > > write >> > > > > will >> > > > > > be passed to this handler. I will update the KIP/PR to be more >> > clear >> > > on >> > > > > > this matter. >> > > > > > >> > > > > > You raise an excellent point, how should the framework protect >> the >> > > > > > connector or developer from themselves? If a connector enables >> > > > > exactly-once >> > > > > > semantics, it would make sense to me to have the task killed. >> The >> > > > > framework >> > > > > > should enforce this type of misconfiguration that would break >> the >> > > > > internal >> > > > > > semantics of KIP-618. WorkerSourceTask could check the >> > configuration >> > > > > before >> > > > > > handing off the records and exception to this function, fail >> > initial >> > > > > > configuration check, or something of that nature. >
Re: [DISCUSS] KIP-791: Add Record Metadata to State Store Context
Hi Patrick, Thank you for the KIP! - Maybe some more details in the motivation would help to better understand the background of the KIP. Currently, it is hard to judge whether record metadata should be exposed or not. Can you maybe give an example? - Could you please replace RYW abbreviation with read-your-writes (at least that is my guess about the meaning of RYW)? Best, Bruno On 03.11.21 22:43, Guozhang Wang wrote: Thanks Patrick, I looked at the KIP and it looks good to me overall. I think we need to double check whether the record metadata reflect the "last processed record" or the "currently processed record" where the latter may not have been completely processed. In `ProcessorContext#recordMetadata` it returns the latter, but that may not be the preferred case if you want to build the consistency reasoning on top of. Otherwise, LGTM. Guozhang On Wed, Nov 3, 2021 at 1:44 PM Patrick Stuedi wrote: Hi everyone, I would like to start the discussion for KIP-791: Add Record Metadata to State Store Context. The KIP can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-791:+Add+Record+Metadata+to+State+Store+Context Any feedback will be highly appreciated. Many thanks, Patrick
Re: [VOTE] KIP-782: Expandable batch size in producer
Hi Mickael, Thanks for the good comments! Answering them below: - When under load, the producer may allocate extra buffers. Are these buffers ever released if the load drops? --> This is a good point that I've never considered before. Yes, after introducing the "batch.max.size", we should release some buffer out of the buffer pools. In this KIP, we'll only keep maximum "batch.size" into pool, and mark the rest of memory as free to use. The reason we keep maximum "batch.size" back to pool is because the semantic of "batch.size" is the batch full limit. In most cases, the batch.size should be able to contain the records to be sent within linger.ms time. - Do we really need batch.initial.size? It's not clear that having this extra setting adds a lot of value. --> I think "batch.initial.size" is important to achieve higher memory usage. Now, I made the default value to 4KB, so after upgrading to the new release, the producer memory usage will become better. I've updated the KIP. Thank you. Luke On Wed, Nov 3, 2021 at 6:44 PM Mickael Maison wrote: > Hi Luke, > > Thanks for the KIP. It looks like an interesting idea. I like the > concept of dynamically adjusting settings to handle load. I wonder if > other client settings could also benefit from a similar logic. > > Just a couple of questions: > - When under load, the producer may allocate extra buffers. Are these > buffers ever released if the load drops? > - Do we really need batch.initial.size? It's not clear that having > this extra setting adds a lot of value. > > Thanks, > Mickael > > On Tue, Oct 26, 2021 at 11:12 AM Luke Chen wrote: > > > > Thank you, Artem! > > > > @devs, welcome to vote for this KIP. > > Key proposal: > > 1. allocate multiple smaller initial batch size buffer in producer, and > > list them together when expansion for better memory usage > > 2. add a max batch size config in producer, so when producer rate is > > suddenly high, we can still have high throughput with batch size larger > > than "batch.size" (and less than "batch.max.size", where "batch.size" is > > soft limit and "batch.max.size" is hard limit) > > Here's the updated KIP: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer > > > > And, any comments and feedback are welcome. > > > > Thank you. > > Luke > > > > On Tue, Oct 26, 2021 at 6:35 AM Artem Livshits > > wrote: > > > > > Hi Luke, > > > > > > I've looked at the updated KIP-782, it looks good to me. > > > > > > -Artem > > > > > > On Sun, Oct 24, 2021 at 1:46 AM Luke Chen wrote: > > > > > > > Hi Artem, > > > > Thanks for your good suggestion again. > > > > I've combined your idea into this KIP, and updated it. > > > > Note, in the end, I still keep the "batch.initial.size" config > (default > > > is > > > > 0, which means "batch.size" will be initial batch size) for better > memory > > > > conservation. > > > > > > > > Detailed description can be found here: > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer > > > > > > > > Let me know if you have other suggestions. > > > > > > > > Thank you. > > > > Luke > > > > > > > > On Sat, Oct 23, 2021 at 10:50 AM Luke Chen > wrote: > > > > > > > >> Hi Artem, > > > >> Thanks for the suggestion. Let me confirm my understanding is > correct. > > > >> So, what you suggest is that the "batch.size" is more like a "soft > > > limit" > > > >> batch size, and the "hard limit" is "batch.max.size". When reaching > the > > > >> batch.size of the buffer, it means the buffer is "ready" to be be > sent. > > > But > > > >> before the linger.ms reached, if there are more data coming, we can > > > >> still accumulate it into the same buffer, until it reached the > > > >> "batch.max.size". After it reached the "batch.max.size", we'll > create > > > >> another batch for it. > > > >> > > > >> So after your suggestion, we won't need the "batch.initial.size", > and we > > > >> can use "batch.size" as the initial batch size. We list each > > > "batch.size" > > > >> together, until it reached "batch.max.size". Something like this: > > > >> > > > >> [image: image.png] > > > >> Is my understanding correct? > > > >> If so, that sounds good to me. > > > >> If not, please kindly explain more to me. > > > >> > > > >> Thank you. > > > >> Luke > > > >> > > > >> > > > >> > > > >> > > > >> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits > > > >> wrote: > > > >> > > > >>> Hi Luke, > > > >>> > > > >>> Nice suggestion. It should optimize how memory is used with > different > > > >>> production rates, but I wonder if we can take this idea further and > > > >>> improve > > > >>> batching in general. > > > >>> > > > >>> Currently batch.size is used in two conditions: > > > >>> > > > >>> 1. When we append records to a batch in the accumulator, we create > a > > > new > > > >>> batch if the current batch would exceed the batch.size. > > > >>> 2. When we drain the batch from the accumulator, a batch becom