Re: [DISCUSS] KIP-794: Strictly Uniform Sticky Partitioner

2021-11-05 Thread Artem Livshits
Hi Luke,

Thank you for your feedback.  I've updated the KIP with your suggestions.

1. Updated with a better example.
2. I removed the reference to ClassicDefaultPartitioner, it was probably
confusing.
3. The logic doesn't rely on checking batches, I've updated the proposal to
make it more explicit.
4. The primary issue (uneven distribution) is described in the linked jira,
copied an example from jira into the KIP as well.

-Artem


On Thu, Nov 4, 2021 at 8:34 PM Luke Chen  wrote:

> Hi Artem,
> Thanks for the KIP! And thanks for reminding me to complete KIP-782, soon.
> :)
>
> Back to the KIP, I have some comments:
> 1. You proposed to have a new config: "partitioner.sticky.batch.size", but
> I can't see how we're going to use it to make the partitioner better.
> Please explain more in KIP (with an example will be better as suggestion
> (4))
> 2. In the "Proposed change" section, you take an example to use
> "ClassicDefaultPartitioner", is that referring to the current default
> sticky partitioner? I think it'd better you name your proposed partition
> with a different name for distinguish between the default one and new one.
> (Although after implementation, we are going to just use the same name)
> 3. So, if my understanding is correct, you're going to have a "batch"
> switch, and before the in-flight is full, it's disabled. Otherwise, we'll
> enable it. Is that right? Sorry, I don't see any advantage of having this
> batch switch. Could you explain more?
> 4. I think it should be more clear if you can have a clear real example in
> the motivation section, to describe what issue we faced using current
> sticky partitioner. And in proposed changes section, using the same
> example, to describe more detail about how you fix this issue with your
> way.
>
> Thank you.
> Luke
>
> On Fri, Nov 5, 2021 at 1:38 AM Artem Livshits
>  wrote:
>
> > Hello,
> >
> > This is the discussion thread for
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > .
> >
> > The proposal is a bug fix for
> > https://issues.apache.org/jira/browse/KAFKA-10888, but it does include a
> > client config change, therefore we have a KIP to discuss.
> >
> > -Artem
> >
>


Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-05 Thread Jun Rao
Hi, David,

Thanks for the reply.

16. My first concern is that the KIP picks up meta.version inconsistently
during the deployment. If a new cluster is started, we pick up the highest
version. If we upgrade, we leave the feature version unchanged.
Intuitively, it seems that independent of how a cluster is deployed, we
should always pick the same feature version. I think we need to think
this through in this KIP. My second concern is that as a particular version
matures, it's inconvenient for a user to manually upgrade every feature
version. As long as we have a path to achieve that in the future, we don't
need to address that in this KIP.

21. "./kafka-features.sh delete": Deleting a feature seems a bit weird
since the logic is always there. Would it be better to use disable?

Jun

On Fri, Nov 5, 2021 at 8:11 AM David Arthur
 wrote:

> Colin and Jun, thanks for the additional comments!
>
> Colin:
>
> > We've been talking about having an automated RPC compatibility checker
>
> Do we have a way to mark fields in schemas as deprecated? It can stay in
> the RPC, it just complicates the logic a bit.
>
> > It would be nice if the active controller could validate that a majority
> of the quorum could use the proposed metadata.version. The active
> controller should have this information, right? If we don't have recent
> information  from a quorum of voters, we wouldn't be active.
>
> I believe we should have this information from the ApiVersionsResponse. It
> would be good to do this validation to avoid a situation where a
> quorum leader can't be elected due to unprocessable records.
>
> > Do we need delete as a command separate from downgrade?
>
> I think from an operator's perspective, it is nice to distinguish between
> changing a feature flag and unsetting it. It might be surprising to an
> operator to see the flag's version set to nothing when they requested the
> downgrade to version 0 (or less).
>
> > it seems like we should spell out that metadata.version begins at 1 in
> KRaft clusters
>
> I added this text:
>
> Introduce an IBP version to indicate the lowest software version that
> > supports *metadata.version*. Below this IBP, the *metadata.version* is
> > undefined and will not be examined. At or above this IBP, the
> > *metadata.version* must be *0* for ZooKeeper clusters and will be
> > initialized as *1* for KRaft clusters.
>
>
> > We probably also want an RPC implemented by both brokers and controllers
> that will reveal the min and max supported versions for each feature level
> supported by the server
>
> This is available in ApiVersionsResponse (we include the server's supported
> features as well as the cluster's finalized features)
>
> 
>
> Jun:
>
> 12. I've updated the KIP with AdminClient changes
>
> 14. You're right, it looks like I missed a few sections regarding snapshot
> generation. I've corrected it
>
> 16. This feels more like an enhancement to KIP-584. I agree it could be
> useful, but perhaps we could address it separately from KRaft upgrades?
>
> 20. Indeed snapshots are not strictly necessary during an upgrade, I've
> reworded this
>
>
> Thanks!
> David
>
>
> On Thu, Nov 4, 2021 at 6:51 PM Jun Rao  wrote:
>
> > Hi, David, Jose and Colin,
> >
> > Thanks for the reply. A few more comments.
> >
> > 12. It seems that we haven't updated the AdminClient accordingly?
> >
> > 14. "Metadata snapshot is generated and sent to the other inactive
> > controllers and to brokers". I thought we wanted each broker to generate
> > its own snapshot independently? If only the controller generates the
> > snapshot, how do we force other brokers to pick it up?
> >
> > 16. If a feature version is new, one may not want to enable it
> immediately
> > after the cluster is upgraded. However, if a feature version has been
> > stable, requiring every user to run a command to upgrade to that version
> > seems inconvenient. One way to improve this is for each feature to define
> > one version as the default. Then, when we upgrade a cluster, we will
> > automatically upgrade the feature to the default version. An admin could
> > use the tool to upgrade to a version higher than the default.
> >
> > 20. "The quorum controller can assist with this process by generating a
> > metadata snapshot after a metadata.version increase has been committed to
> > the metadata log. This snapshot will be a convenient way to let broker
> and
> > controller components rebuild their entire in-memory state following an
> > upgrade." The new version of the software could read both the new and the
> > old version. Is generating a new snapshot during upgrade needed?
> >
> > Jun
> >
> >
> > On Wed, Nov 3, 2021 at 5:42 PM Colin McCabe  wrote:
> >
> > > On Tue, Oct 12, 2021, at 10:34, Jun Rao wrote:
> > > > Hi, David,
> > > >
> > > > One more comment.
> > > >
> > > > 16. The main reason why KIP-584 requires finalizing a feature
> manually
> > is
> > > > that in the ZK world, the controller doesn't know all brokers in a
> > > clu

Re: [DISCUSS] KIP-791: Add Record Metadata to State Store Context

2021-11-05 Thread John Roesler
Thanks for the KIP, Patrick!

It looks like you addressed Guozhang's and Bruno's very good
feeback, and I like the result. The example especially helps
clarify how this property might be useful.

I'm in favor of this proposal.

Thanks,
-John

On Fri, 2021-11-05 at 12:03 +0100, Bruno Cadonna wrote:
> Hi Patrick,
> 
> Thank you for the KIP!
> 
> - Maybe some more details in the motivation would help to better 
> understand the background of the KIP. Currently, it is hard to judge 
> whether record metadata should be exposed or not. Can you maybe give an 
> example?
> 
> - Could you please replace RYW abbreviation with read-your-writes (at 
> least that is my guess about the meaning of RYW)?
> 
> Best,
> Bruno
> 
> 
> 
> On 03.11.21 22:43, Guozhang Wang wrote:
> > Thanks Patrick,
> > 
> > I looked at the KIP and it looks good to me overall. I think we need to
> > double check whether the record metadata reflect the "last processed
> > record" or the "currently processed record" where the latter may not have
> > been completely processed. In `ProcessorContext#recordMetadata` it returns
> > the latter, but that may not be the preferred case if you want to build the
> > consistency reasoning on top of.
> > 
> > Otherwise, LGTM.
> > 
> > 
> > Guozhang
> > 
> > On Wed, Nov 3, 2021 at 1:44 PM Patrick Stuedi 
> > wrote:
> > 
> > > Hi everyone,
> > > 
> > > I would like to start the discussion for KIP-791: Add Record Metadata to
> > > State Store Context.
> > > 
> > > The KIP can be found here:
> > > 
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-791:+Add+Record+Metadata+to+State+Store+Context
> > > 
> > > Any feedback will be highly appreciated.
> > > 
> > > Many thanks,
> > >   Patrick
> > > 
> > 
> > 




Re: New Kafka Consumer : unknown member id

2021-11-05 Thread Kafka Life
Hello Luke

i have build a new kafka environment with kafka 2.8.0

the consumer is a new consumer set up to this environment is throwing the
below error... the old consumers for the same applications for the same
environment -2.8.0 is working fine.. .

could you please advise

2021-11-02 12:25:24 DEBUG AbstractCoordinator:557 - [Consumer
clientId=, groupId=] Attempt to join group failed due to unknown
member id.

On Fri, Oct 29, 2021 at 7:36 AM Luke Chen  wrote:

> Hi,
> Which version of kafka client are you using?
> I can't find this error message in the source code.
> When googling this error message, it showed the error is in Kafka v0.9.
>
> Could you try to use the V3.0.0 and see if that issue still exist?
>
> Thank you.
> Luke
>
> On Thu, Oct 28, 2021 at 11:15 PM Kafka Life 
> wrote:
>
> > Dear Kafka Experts
> >
> > We have set up a group.id (consumer ) = YYY
> > But when tried to connect to kafka instance : i get this error message. I
> > am sure this consumer (group id does not exist in kafka) .We user plain
> > text protocol to connect to kafka 2.8.0. Please suggest how to resolve
> this
> > issue.
> >
> > DEBUG AbstractCoordinator:557 - [Consumer clientId=X,
> groupId=YYY]
> > Attempt to join group failed due to unknown member id.
> >
>


[jira] [Created] (KAFKA-13435) Group won't consume partitions added after static member restart

2021-11-05 Thread Ryan Leslie (Jira)
Ryan Leslie created KAFKA-13435:
---

 Summary: Group won't consume partitions added after static member 
restart
 Key: KAFKA-13435
 URL: https://issues.apache.org/jira/browse/KAFKA-13435
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.7.0
Reporter: Ryan Leslie


When using consumer groups with static membership, if the consumer marked as 
leader has restarted, then metadata changes such as partition increase are not 
triggering expected rebalances.

To reproduce this issue, simply:
 # Create a static consumer subscribed to a single topic
 # Close the consumer and create a new one with the same group instance id
 # Increase partitions for the topic
 # Observe that no rebalance occurs and the new partitions are not assigned

I have only tested this in 2.7, but it may apply to newer versions as well.
h3. Analysis

In _ConsumerCoordinator_, one responsibility of the leader consumer is to track 
metadata and trigger a rebalance if there are changes such as new partitions 
added:

[https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
{code:java}
if (assignmentSnapshot != null && 
!assignmentSnapshot.matches(metadataSnapshot)) {
...
requestRejoinIfNecessary(reason);
return true;
}
{code}
Note that _assignmentSnapshot_ is currently only set if the consumer is the 
leader:

[https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
{code:java}
// Only the leader is responsible for monitoring for metadata changes (i.e. 
partition changes)
if (!isLeader)
assignmentSnapshot = null;
{code}
And _isLeader_ is only true after an assignment is performed during a rebalance:

[https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]

That is, when a consumer group forms, exactly one consumer in the group should 
have _isLeader == True_ and be responsible for triggering rebalances on 
metadata changes.

However, in the case of static membership, if the leader has been restarted and 
rejoined the group, the group essentially no longer has a current leader. Even 
though the metadata changes are fetched, no rebalance will be triggered. That 
is, _isLeader_ will be false for all members.

This issue does not resolve until after an actual group change that causes a 
proper rebalance. In order to safely make a partition increase when using 
static membership, consumers must be stopped and have timed out, or forcibly 
removed with _AdminClient.removeMembersFromConsumerGroup()_.

Correcting this in the client probably also requires help from the broker. 
Currently, when a static consumer that is leader is restarted, the coordinator 
does recognize the change:

e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
{noformat}
[2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test with 
unknown member id rejoins, assigning new member id 
353WV-1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
6ebf-47da-95ef-c54fef17ab74, while old member id 
1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff
 will be removed. (
kafka.coordinator.group.GroupCoordinator){noformat}
However, it does not attempt to update the leader id since this isn't a new 
rebalance, and JOIN_GROUP will continue returning the now stale member id as 
leader:
{noformat}
2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer 
instanceId=353WV-1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, 
clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, groupId=ryan_test] 
Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, 
errorCode=0, generationId=40, protocolType='consumer', protocolName='range', 
leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff',
 
memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74',
 members=[]){noformat}
This means that it's not easy for any particular restarted member to identify 
that it should consider itself leader and handle metadata changes.

There is reference to the difficulty of leader restarts in KAFKA-7728 but the 
focus seemed mainly on avoiding needless rebalances for static members. That 
goal was accomplished, but this issue seems to be a side effect of both not 
rebalancing AND not having the rejoined member re-claim its leadership status.

Also, I have not verified if it's strictly related or valid, but noticed this 
ticket has been opened too: K

[jira] [Created] (KAFKA-13434) Add a public API for AbstractCoordinatos

2021-11-05 Thread Hector G (Jira)
Hector G created KAFKA-13434:


 Summary: Add a public API for AbstractCoordinatos
 Key: KAFKA-13434
 URL: https://issues.apache.org/jira/browse/KAFKA-13434
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Hector G
Assignee: Hector G


KIP-784: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-784%3A+Add+public+APIs+for+AbstractCoordinator]

The AbstractCoordinator should have a companion public interface that is part 
of Kafka's public API, so backwards compatibility can be maintained in future 
versions of the client libraries



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-05 Thread David Arthur
Colin and Jun, thanks for the additional comments!

Colin:

> We've been talking about having an automated RPC compatibility checker

Do we have a way to mark fields in schemas as deprecated? It can stay in
the RPC, it just complicates the logic a bit.

> It would be nice if the active controller could validate that a majority
of the quorum could use the proposed metadata.version. The active
controller should have this information, right? If we don't have recent
information  from a quorum of voters, we wouldn't be active.

I believe we should have this information from the ApiVersionsResponse. It
would be good to do this validation to avoid a situation where a
quorum leader can't be elected due to unprocessable records.

> Do we need delete as a command separate from downgrade?

I think from an operator's perspective, it is nice to distinguish between
changing a feature flag and unsetting it. It might be surprising to an
operator to see the flag's version set to nothing when they requested the
downgrade to version 0 (or less).

> it seems like we should spell out that metadata.version begins at 1 in
KRaft clusters

I added this text:

Introduce an IBP version to indicate the lowest software version that
> supports *metadata.version*. Below this IBP, the *metadata.version* is
> undefined and will not be examined. At or above this IBP, the
> *metadata.version* must be *0* for ZooKeeper clusters and will be
> initialized as *1* for KRaft clusters.


> We probably also want an RPC implemented by both brokers and controllers
that will reveal the min and max supported versions for each feature level
supported by the server

This is available in ApiVersionsResponse (we include the server's supported
features as well as the cluster's finalized features)



Jun:

12. I've updated the KIP with AdminClient changes

14. You're right, it looks like I missed a few sections regarding snapshot
generation. I've corrected it

16. This feels more like an enhancement to KIP-584. I agree it could be
useful, but perhaps we could address it separately from KRaft upgrades?

20. Indeed snapshots are not strictly necessary during an upgrade, I've
reworded this


Thanks!
David


On Thu, Nov 4, 2021 at 6:51 PM Jun Rao  wrote:

> Hi, David, Jose and Colin,
>
> Thanks for the reply. A few more comments.
>
> 12. It seems that we haven't updated the AdminClient accordingly?
>
> 14. "Metadata snapshot is generated and sent to the other inactive
> controllers and to brokers". I thought we wanted each broker to generate
> its own snapshot independently? If only the controller generates the
> snapshot, how do we force other brokers to pick it up?
>
> 16. If a feature version is new, one may not want to enable it immediately
> after the cluster is upgraded. However, if a feature version has been
> stable, requiring every user to run a command to upgrade to that version
> seems inconvenient. One way to improve this is for each feature to define
> one version as the default. Then, when we upgrade a cluster, we will
> automatically upgrade the feature to the default version. An admin could
> use the tool to upgrade to a version higher than the default.
>
> 20. "The quorum controller can assist with this process by generating a
> metadata snapshot after a metadata.version increase has been committed to
> the metadata log. This snapshot will be a convenient way to let broker and
> controller components rebuild their entire in-memory state following an
> upgrade." The new version of the software could read both the new and the
> old version. Is generating a new snapshot during upgrade needed?
>
> Jun
>
>
> On Wed, Nov 3, 2021 at 5:42 PM Colin McCabe  wrote:
>
> > On Tue, Oct 12, 2021, at 10:34, Jun Rao wrote:
> > > Hi, David,
> > >
> > > One more comment.
> > >
> > > 16. The main reason why KIP-584 requires finalizing a feature manually
> is
> > > that in the ZK world, the controller doesn't know all brokers in a
> > cluster.
> > > A broker temporarily down is not registered in ZK. in the KRaft world,
> > the
> > > controller keeps track of all brokers, including those that are
> > temporarily
> > > down. This makes it possible for the controller to automatically
> > finalize a
> > > feature---it's safe to do so when all brokers support that feature.
> This
> > > will make the upgrade process much simpler since no manual command is
> > > required to turn on a new feature. Have we considered this?
> > >
> > > Thanks,
> > >
> > > Jun
> >
> > Hi Jun,
> >
> > I guess David commented on this point already, but I'll comment as well.
> I
> > always had the perception that users viewed rolls as potentially risky
> and
> > were looking for ways to reduce the risk. Not enabling features right
> away
> > after installing new software seems like one way to do that. If we had a
> > feature to automatically upgrade during a roll, I'm not sure that I would
> > recommend that people use it, because if something fails, it makes it
> > harder to tell if the 

RE: Re: [VOTE] KIP-714: Client Metrics and Observability

2021-11-05 Thread Igor Buzatovic
+1

We also have a lot of clients using our central Kafka cluster, and it would
be great to have client metrics so we can provide end-to-end monitoring.

Igor Buzatović
Porsche Digital

On 2021/11/01 20:19:20 J Rivers wrote:
> +1
>
> Thank you for the KIP!
>
> Our organization runs kafka at large scale in a multi-tenant
configuration.
> We actually have many other enterprises connecting up to our system to
> retrieve stream data. These feeds vary greatly in volume and velocity. The
> peak rates are a multiplicative factor of the nominal.  There is extreme
> skew in our datasets in a number of ways.
>
> We don't have time to work with every new internal/external client to tune
> their feeds. They need to be able to take one of the many kafka clients
and
> go off to the races.
>
> Being able to retrieve client metrics would be invaluable here as it's
hard
> and time consuming to communicate out of the enterprise walls.
>
> This KIP is important to us to expand the use of our datasets internally
> and outside the borders of the enterprise. Our clients like the
performance
> and data safeties related to the kafka connection. The observability has
> been a problem...
>
> Jonathan Rivers
> jrivers...@gmail.com
>
>
>
>
> On Mon, Oct 18, 2021 at 11:56 PM Ryanne Dolan  wrote:
>
> > -1
> >
> > Ryanne
> >
> > On Mon, Oct 18, 2021, 4:30 AM Magnus Edenhill  wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a vote on KIP-714.
> > > https://cwiki.apache.org/confluence/x/2xRRCg
> > >
> > > Discussion thread:
> > > https://www.mail-archive.com/dev@kafka.apache.org/msg119000.html
> > >
> > > Thanks,
> > > Magnus
> > >
> >
>


Re: [DISCUSS] KIP-779: Allow Source Tasks to Handle Producer Exceptions

2021-11-05 Thread Knowles Atchison Jr
Good morning,

If there is no additional feedback, I am going to call a vote for this KIP
on Monday.

Knowles

On Tue, Nov 2, 2021 at 10:00 AM Knowles Atchison Jr 
wrote:

> Third time's the charm.
>
> I've added a getter for the RetryWithToleranceOperator to get the
> ToleranceType. I've updated WorkerSourceTask to check this setting to see
> if it is ToleranceType.ALL.
>
> Setting "errors.tolerance" to "all" solves both problems:
>
> 1. Use an existing configuration
> 2. Moves the configuration back to the connector/task level instead of at
> the connect worker level.
>
> I've updated the KIP and PR.
>
> Additional thoughts and feedback are welcome.
>
> Knowles
>
> On Mon, Nov 1, 2021 at 2:00 AM Arjun Satish 
> wrote:
>
>> Looks really nice. Thanks for the changes. Couple of suggestions:
>>
>> 1. Can we reuse any of the existing configs, instead of introducing a new
>> one? I’m wondering if the error.tolerance configuration’s scope can be
>> increased to include produce errors as well. That’ll help us keep number
>> of
>> configs in check. Effectively, if error.tolerance is set to all, then the
>> behavior would be like how you describe the worker would ignore producer
>> errors.
>>
>> 2. If we do choose to have a new config, could you please call out the
>> possible values it can take in the kip?
>>
>> Thanks again!
>>
>> Best,
>>
>>
>> On Fri, Oct 29, 2021 at 9:53 AM Knowles Atchison Jr <
>> katchiso...@gmail.com>
>> wrote:
>>
>> > Arjun,
>> >
>> > Thank you for your feedback, I have updated the KIP.
>> >
>> > This solution is more elegant than my original proposal; however, after
>> > working on the implementation, we have now pushed the configuration from
>> > the connector/task itself back to the connect worker. All tasks running
>> on
>> > the worker would share this ignore producer exception configuration
>> flag.
>> > This works for my use cases where I cannot envision setting this for
>> only
>> > one type of connector we have, but this does take the choice out of the
>> > hands of the connector developer. I suppose that is for the best, in a
>> > vacuum only the worker should have a say in how it handles message
>> > production.
>> >
>> > Additional thoughts and feedback are welcome.
>> >
>> > Knowles
>> >
>> > On Thu, Oct 28, 2021 at 10:54 AM Arjun Satish 
>> > wrote:
>> >
>> > > Yes, that makes sense. And it fits in very nicely with the current
>> error
>> > > handling framework.
>> > >
>> > > On Thu, Oct 28, 2021 at 10:39 AM Knowles Atchison Jr <
>> > > katchiso...@gmail.com>
>> > > wrote:
>> > >
>> > > > That would work. I originally thought that it would be confusing to
>> > > > overload that function when a Record that wasn't actually written,
>> but
>> > > > looking at SourceTask more closely, in commitRecord(SourceRecord,
>> > > > RecordMetadata), the RecordMetadata is set to null in the event of a
>> > > > filtered transformation so the framework is already doing this in a
>> > > certain
>> > > > regard.
>> > > >
>> > > > Knowles
>> > > >
>> > > > On Thu, Oct 28, 2021 at 10:29 AM Arjun Satish <
>> arjun.sat...@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > To ack the message back to the source system, we already have a
>> > > > > commitRecord method. Once the bad record is handled by skip/dlq,
>> we
>> > > could
>> > > > > just call commitRecord() on it?
>> > > > >
>> > > > > On Thu, Oct 28, 2021 at 9:35 AM Knowles Atchison Jr <
>> > > > katchiso...@gmail.com
>> > > > > >
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Chris,
>> > > > > >
>> > > > > > Thank you for your reply!
>> > > > > >
>> > > > > > It is a clarity error regarding the javadoc. I am not
>> operationally
>> > > > > > familiar with all of the exceptions Kafka considers
>> non-retriable,
>> > > so I
>> > > > > > pulled the list from Callback.java:
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/kafka/blob/1afe2a5190e9c98e38c84dc793f4303ea51bc19b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java#L35
>> > > > > > to be an illustrative example of the types of exceptions that
>> would
>> > > > kill
>> > > > > > the connector outright. Any exception thrown during the producer
>> > > write
>> > > > > will
>> > > > > > be passed to this handler. I will update the KIP/PR to be more
>> > clear
>> > > on
>> > > > > > this matter.
>> > > > > >
>> > > > > > You raise an excellent point, how should the framework protect
>> the
>> > > > > > connector or developer from themselves? If a connector enables
>> > > > > exactly-once
>> > > > > > semantics, it would make sense to me to have the task killed.
>> The
>> > > > > framework
>> > > > > > should enforce this type of misconfiguration that would break
>> the
>> > > > > internal
>> > > > > > semantics of KIP-618. WorkerSourceTask could check the
>> > configuration
>> > > > > before
>> > > > > > handing off the records and exception to this function, fail
>> > initial
>> > > > > > configuration check, or something of that nature.
>

Re: [DISCUSS] KIP-791: Add Record Metadata to State Store Context

2021-11-05 Thread Bruno Cadonna

Hi Patrick,

Thank you for the KIP!

- Maybe some more details in the motivation would help to better 
understand the background of the KIP. Currently, it is hard to judge 
whether record metadata should be exposed or not. Can you maybe give an 
example?


- Could you please replace RYW abbreviation with read-your-writes (at 
least that is my guess about the meaning of RYW)?


Best,
Bruno



On 03.11.21 22:43, Guozhang Wang wrote:

Thanks Patrick,

I looked at the KIP and it looks good to me overall. I think we need to
double check whether the record metadata reflect the "last processed
record" or the "currently processed record" where the latter may not have
been completely processed. In `ProcessorContext#recordMetadata` it returns
the latter, but that may not be the preferred case if you want to build the
consistency reasoning on top of.

Otherwise, LGTM.


Guozhang

On Wed, Nov 3, 2021 at 1:44 PM Patrick Stuedi 
wrote:


Hi everyone,

I would like to start the discussion for KIP-791: Add Record Metadata to
State Store Context.

The KIP can be found here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-791:+Add+Record+Metadata+to+State+Store+Context

Any feedback will be highly appreciated.

Many thanks,
  Patrick






Re: [VOTE] KIP-782: Expandable batch size in producer

2021-11-05 Thread Luke Chen
Hi Mickael,
Thanks for the good comments! Answering them below:

- When under load, the producer may allocate extra buffers. Are these
buffers ever released if the load drops?
--> This is a good point that I've never considered before. Yes, after
introducing the "batch.max.size", we should release some buffer out of the
buffer pools. In this KIP, we'll only keep maximum "batch.size" into pool,
and mark the rest of memory as free to use. The reason we keep maximum
"batch.size" back to pool is because the semantic of "batch.size" is the
batch full limit. In most cases, the batch.size should be able to contain
the records to be sent within linger.ms time.

- Do we really need batch.initial.size? It's not clear that having this
extra setting adds a lot of value.
--> I think "batch.initial.size" is important to achieve higher memory
usage. Now, I made the default value to 4KB, so after upgrading to the new
release, the producer memory usage will become better.

I've updated the KIP.

Thank you.
Luke

On Wed, Nov 3, 2021 at 6:44 PM Mickael Maison 
wrote:

> Hi Luke,
>
> Thanks for the KIP. It looks like an interesting idea. I like the
> concept of dynamically adjusting settings to handle load. I wonder if
> other client settings could also benefit from a similar logic.
>
> Just a couple of questions:
> - When under load, the producer may allocate extra buffers. Are these
> buffers ever released if the load drops?
> - Do we really need batch.initial.size? It's not clear that having
> this extra setting adds a lot of value.
>
> Thanks,
> Mickael
>
> On Tue, Oct 26, 2021 at 11:12 AM Luke Chen  wrote:
> >
> > Thank you, Artem!
> >
> > @devs, welcome to vote for this KIP.
> > Key proposal:
> > 1. allocate multiple smaller initial batch size buffer in producer, and
> > list them together when expansion for better memory usage
> > 2. add a max batch size config in producer, so when producer rate is
> > suddenly high, we can still have high throughput with batch size larger
> > than "batch.size" (and less than "batch.max.size", where "batch.size" is
> > soft limit and "batch.max.size" is hard limit)
> > Here's the updated KIP:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> >
> > And, any comments and feedback are welcome.
> >
> > Thank you.
> > Luke
> >
> > On Tue, Oct 26, 2021 at 6:35 AM Artem Livshits
> >  wrote:
> >
> > > Hi Luke,
> > >
> > > I've looked at the updated KIP-782, it looks good to me.
> > >
> > > -Artem
> > >
> > > On Sun, Oct 24, 2021 at 1:46 AM Luke Chen  wrote:
> > >
> > > > Hi Artem,
> > > > Thanks for your good suggestion again.
> > > > I've combined your idea into this KIP, and updated it.
> > > > Note, in the end, I still keep the "batch.initial.size" config
> (default
> > > is
> > > > 0, which means "batch.size" will be initial batch size) for better
> memory
> > > > conservation.
> > > >
> > > > Detailed description can be found here:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > >
> > > > Let me know if you have other suggestions.
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Sat, Oct 23, 2021 at 10:50 AM Luke Chen 
> wrote:
> > > >
> > > >> Hi Artem,
> > > >> Thanks for the suggestion. Let me confirm my understanding is
> correct.
> > > >> So, what you suggest is that the "batch.size" is more like a "soft
> > > limit"
> > > >> batch size, and the "hard limit" is "batch.max.size". When reaching
> the
> > > >> batch.size of the buffer, it means the buffer is "ready" to be be
> sent.
> > > But
> > > >> before the linger.ms reached, if there are more data coming, we can
> > > >> still accumulate it into the same buffer, until it reached the
> > > >> "batch.max.size". After it reached the "batch.max.size", we'll
> create
> > > >> another batch for it.
> > > >>
> > > >> So after your suggestion, we won't need the "batch.initial.size",
> and we
> > > >> can use "batch.size" as the initial batch size. We list each
> > > "batch.size"
> > > >> together, until it reached "batch.max.size". Something like this:
> > > >>
> > > >> [image: image.png]
> > > >> Is my understanding correct?
> > > >> If so, that sounds good to me.
> > > >> If not, please kindly explain more to me.
> > > >>
> > > >> Thank you.
> > > >> Luke
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits
> > > >>  wrote:
> > > >>
> > > >>> Hi Luke,
> > > >>>
> > > >>> Nice suggestion.  It should optimize how memory is used with
> different
> > > >>> production rates, but I wonder if we can take this idea further and
> > > >>> improve
> > > >>> batching in general.
> > > >>>
> > > >>> Currently batch.size is used in two conditions:
> > > >>>
> > > >>> 1. When we append records to a batch in the accumulator, we create
> a
> > > new
> > > >>> batch if the current batch would exceed the batch.size.
> > > >>> 2. When we drain the batch from the accumulator, a batch becom