[jira] [Created] (KAFKA-13838) Improve the poll method of ConsumerNetworkClient

2022-04-19 Thread RivenSun (Jira)
RivenSun created KAFKA-13838:


 Summary: Improve the poll method of ConsumerNetworkClient
 Key: KAFKA-13838
 URL: https://issues.apache.org/jira/browse/KAFKA-13838
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: RivenSun
Assignee: RivenSun


Briefly describe the process of sending clientRequest on the Kafka Client side, 
which is divided into two steps.

1.Selector.send(send) method
Kafka's underlying tcp connection channel ensures that data is sent to the 
network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at a 
time{*}. And the next InFlightRequest is allowed to be added only if the 
{color:#ff}queue.peekFirst().send.completed(){color} condition is met.
{code:java}
NetworkClient.isReady(node) ->
NetworkClient.canSendRequest(node) -> 
InFlightRequests.canSendMore(node){code}

2. Selector.poll(timeout)
After KafkaChannel sets a send each time, there should be a 
Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments on 
the Selector.send(send) method.
{code:java}
/**
 * Queue the given request for sending in the subsequent {@link #poll(long)} 
calls
 * @param send The request to send
 */
public void send(NetworkSend send) { {code}
Send may become *completed* *only after* the Selector.poll(timeout) method is 
executed, more detail see Selector.write(channel) methos.

 

Let's go back and look at this method: ConsumerNetworkClient.poll(Timer timer, 
PollCondition pollCondition, boolean disableWakeup) method.
There are three places involved in sending data in this method:
{code:java}
long pollDelayMs = trySend(timer.currentTimeMs());
->
 client.poll(...)
->
trySend(timer.currentTimeMs());

{code}
There are two problems with this process:

1. After calling the trySend(...) method for the second time, we should 
immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
send generated each time can be consumed by the next Selector.poll() method.

2. The while loop in trySend(...) method can be removed
After a node executes client.send(request, now) for the first time, because the 
first request will never be *completed* here, the subsequent requests will 
never satisfy the client.ready(node, now) condition.
Although the current code will break directly on the second execution of the 
loop, there will be {*}an additional execution of the loop{*}.
Modify the code as follows:
{code:java}
long trySend(long now) {
long pollDelayMs = maxPollTimeoutMs;

// send any requests that can be sent now
for (Node node : unsent.nodes()) {
Iterator iterator = unsent.requestIterator(node);
if (iterator.hasNext()) {
pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
if (client.ready(node, now)) {
client.send(iterator.next(), now);
iterator.remove();
}
}
}
return pollDelayMs;
}{code}
3. By the way, the unsent.clean() method that is executed last can also be 
optimized.
Easier to read the code.
{code:java}
public void clean() {
// the lock protects removal from a concurrent put which could otherwise 
mutate the
// queue after it has been removed from the map
synchronized (unsent) {
unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
}
} {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-821: Connect Transforms support for nested structures

2022-04-19 Thread Chris Egerton
Hi Jorge,

Thank you for sticking through this. I have one small remark and one small
clarification; assuming you agree with me on them then I'm ready to vote on
the KIP.

1. InsertField: The "field.on.missing.parent" and "field.on.existing.field"
docs both mention a permitted value of "ingore"; this should be "ignore",
right?
2. InsertField: The examples are still missing the "field.style" property
from the configurations. They should all include the property
"transforms.smt1.field.style": "nested", correct?

Thanks again for working through this, and congratulations on a
well-written KIP!

Cheers,

Chris

On Tue, Apr 19, 2022 at 2:06 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thank you, Chris! I apply these improvements to the KIP, let me know how it
> looks now.
>
> On Mon, 11 Apr 2022 at 23:43, Chris Egerton 
> wrote:
>
> > Hi Jorge,
> >
> > Wow, those examples are great! A few more remarks, but I think we're
> > getting close:
> >
> > 1. The examples differ across SMTs with the name of the newly-introduced
> > style property; some of them use "field.style", and some use
> > "fields.style". I think for consistency's sake we should stick with just
> > "field.style"; otherwise it could be painful for users to have to
> remember
> > which to use.
> >
>
> Great catch. Agree, I fixed the config names to `field.style`.
>
>
> >
> > 2. Some of the examples are off:
> > - TimestampConverter: the input in the second example ("when field names
> > include dots") doesn't contain a field with a dotted name
> > - ValueToKey: the config in the third example ("when field names include
> > dots") should probably use "parent..child.k2" as the
> > "transforms.smt1.fields" property
> >
>
> Fixed. Thanks!
>
>
> >
> > 3. RE changes to InsertField:
> > - The InsertField SMT should also come with the new "field.style"
> property
> > in order to preserve backwards compatibility, right? I don't see it
> > included in the example configs for that one, just want to make sure
> > - I don't know of any cases where we use snake_case for property names in
> > Kafka; we should probably use "on.missing.parent" and "on.existing.field"
> > as the new property names for InsertField.
> > - Why is the "on_existing_field" (or "on.existing.field") property only
> > applied when the field style is nested? Couldn't it be useful for
> > non-nested fields as well?
> >
>
> Great points! I have applied these suggestions to the KIP.
>
>
> >
> > Cheers,
> >
> > Chris
> >
> > On Sat, Apr 9, 2022 at 12:40 PM Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> > > Again, great feedback Chris. Much appreciated.
> > > Added my comments below:
> > >
> > > On Tue, 5 Apr 2022 at 20:22, Chris Egerton 
> > > wrote:
> > >
> > > > Hi Jorge,
> > > >
> > > > Looking good! I have a few comments left but all but one or two are
> > > minor.
> > > >
> > > > 1. The motivation section states "This KIP is aimed to include
> support
> > > for
> > > > nested structures on the existing SMTs... and to include the
> > abstractions
> > > > to reuse this in future SMTs". A good implementation of this KIP will
> > > > definitely isolate reusable logic into a separate abstraction that
> can
> > be
> > > > easily pulled in to the SMTs we want to add nested field support to,
> > but
> > > > unless we plan on making this kind of abstraction publicly available
> as
> > > > some kind of utility method or class that external SMT developers can
> > > > leverage, we can probably leave this part out as it's more of an
> > > > implementation detail.
> > > >
> > >
> > > Make sense, will leave this out of the KIP.
> > >
> > >
> > > >
> > > > 2. The Cast example is a little misleading, isn't it? It demonstrates
> > the
> > > > escape syntax for fields with dot literals in their names, but it
> > doesn't
> > > > demonstrate a way to actually use the Cast (or any other) SMT to
> > access a
> > > > nested field in a record, which is the whole point of the KIP. I like
> > the
> > > > example of escape syntax but we should probably also add one for
> nested
> > > > field access.
> > > >
> > >
> > > Agree. I have added examples to each SMT to be more clear about how it
> > > affects each function
> > > .
> > >
> > >
> > > >
> > > > 3. With the InsertField SMT, I'm wondering what the specific behavior
> > > will
> > > > be when some or all of the "middle layer" nested fields are missing.
> > For
> > > > example, if I have a record with a value of { "k1": "v1 } and I apply
> > > > InsertField with topic.field = n1.n2.n3.topic, what will happen? Will
> > the
> > > > updated value become { "k1": "v1", "n1": { "n2": { "n3": "topic" }}},
> > > will
> > > > an exception be thrown, or something else? This seems analogous to
> the
> > > > command line mkdir command, which (at least on some operating
> systems)
> > > > fails by default if you try to create a new nested directory where
> > > anything
> > > > but the last element in the path doesn't exist, but 

[jira] [Created] (KAFKA-13837) Return error for Fetch requests from unrecognized followers

2022-04-19 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13837:
---

 Summary: Return error for Fetch requests from unrecognized 
followers
 Key: KAFKA-13837
 URL: https://issues.apache.org/jira/browse/KAFKA-13837
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


If the leader of a partition receives a request from a replica which is not in 
the current replica set, we currently return an empty fetch response with no 
error. I think the rationale for this is that the leader may not have received 
the latest `LeaderAndIsr` update which adds the replica, so we just want the 
follower to retry. The problem with this is that if the `LeaderAndIsr` request 
never arrives, then there might not be an external indication of a problem. 
This probably was the only reasonable option before we added the leader epoch 
to the `Fetch` request API. Now that we have it, it would be clearer to return 
an `UNKNOWN_LEADER_EPOCH` error to indicate that the (replicaId, leaderEpoch) 
tuple is not recognized. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-821: Connect Transforms support for nested structures

2022-04-19 Thread Jorge Esteban Quilcate Otoya
Thank you, Chris! I apply these improvements to the KIP, let me know how it
looks now.

On Mon, 11 Apr 2022 at 23:43, Chris Egerton  wrote:

> Hi Jorge,
>
> Wow, those examples are great! A few more remarks, but I think we're
> getting close:
>
> 1. The examples differ across SMTs with the name of the newly-introduced
> style property; some of them use "field.style", and some use
> "fields.style". I think for consistency's sake we should stick with just
> "field.style"; otherwise it could be painful for users to have to remember
> which to use.
>

Great catch. Agree, I fixed the config names to `field.style`.


>
> 2. Some of the examples are off:
> - TimestampConverter: the input in the second example ("when field names
> include dots") doesn't contain a field with a dotted name
> - ValueToKey: the config in the third example ("when field names include
> dots") should probably use "parent..child.k2" as the
> "transforms.smt1.fields" property
>

Fixed. Thanks!


>
> 3. RE changes to InsertField:
> - The InsertField SMT should also come with the new "field.style" property
> in order to preserve backwards compatibility, right? I don't see it
> included in the example configs for that one, just want to make sure
> - I don't know of any cases where we use snake_case for property names in
> Kafka; we should probably use "on.missing.parent" and "on.existing.field"
> as the new property names for InsertField.
> - Why is the "on_existing_field" (or "on.existing.field") property only
> applied when the field style is nested? Couldn't it be useful for
> non-nested fields as well?
>

Great points! I have applied these suggestions to the KIP.


>
> Cheers,
>
> Chris
>
> On Sat, Apr 9, 2022 at 12:40 PM Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
>
> > Again, great feedback Chris. Much appreciated.
> > Added my comments below:
> >
> > On Tue, 5 Apr 2022 at 20:22, Chris Egerton 
> > wrote:
> >
> > > Hi Jorge,
> > >
> > > Looking good! I have a few comments left but all but one or two are
> > minor.
> > >
> > > 1. The motivation section states "This KIP is aimed to include support
> > for
> > > nested structures on the existing SMTs... and to include the
> abstractions
> > > to reuse this in future SMTs". A good implementation of this KIP will
> > > definitely isolate reusable logic into a separate abstraction that can
> be
> > > easily pulled in to the SMTs we want to add nested field support to,
> but
> > > unless we plan on making this kind of abstraction publicly available as
> > > some kind of utility method or class that external SMT developers can
> > > leverage, we can probably leave this part out as it's more of an
> > > implementation detail.
> > >
> >
> > Make sense, will leave this out of the KIP.
> >
> >
> > >
> > > 2. The Cast example is a little misleading, isn't it? It demonstrates
> the
> > > escape syntax for fields with dot literals in their names, but it
> doesn't
> > > demonstrate a way to actually use the Cast (or any other) SMT to
> access a
> > > nested field in a record, which is the whole point of the KIP. I like
> the
> > > example of escape syntax but we should probably also add one for nested
> > > field access.
> > >
> >
> > Agree. I have added examples to each SMT to be more clear about how it
> > affects each function
> > .
> >
> >
> > >
> > > 3. With the InsertField SMT, I'm wondering what the specific behavior
> > will
> > > be when some or all of the "middle layer" nested fields are missing.
> For
> > > example, if I have a record with a value of { "k1": "v1 } and I apply
> > > InsertField with topic.field = n1.n2.n3.topic, what will happen? Will
> the
> > > updated value become { "k1": "v1", "n1": { "n2": { "n3": "topic" }}},
> > will
> > > an exception be thrown, or something else? This seems analogous to the
> > > command line mkdir command, which (at least on some operating systems)
> > > fails by default if you try to create a new nested directory where
> > anything
> > > but the last element in the path doesn't exist, but can be invoked
> with a
> > > flag that instructs it to go ahead and create all levels of nested
> > > directory instead. I'm leaning on the side of "just create everything"
> > but
> > > would be interested in your thoughts, and either way, we should
> probably
> > > make sure the intended behavior is well-defined before voting.
> > >
> >
> > This is an interesting case, thanks for catching this!
> > The default behavior I see is to create parents if they are missing and
> > overwrite fields
> > if they already exist.
> > I'm planning to include the following two flags if there is a need to
> > overwrite this behavior:
> > - `on_missing_parent` = (CREATE|IGNORE), default=CREATE
> > - `on_existing_field` = (OVERWRITE|IGNORE), default=OVERWRITE
> >
> >
> > >
> > > 4. Similarly, what will the behavior be if any of the field elements
> > > specified with InsertField already exist in the record value? Will we
> > just
> > > overwrite them? 

[jira] [Resolved] (KAFKA-13654) Extend KStream process with new Processor API

2022-04-19 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13654.
--
Fix Version/s: 3.3.0
   Resolution: Fixed

> Extend KStream process with new Processor API
> -
>
> Key: KAFKA-13654
> URL: https://issues.apache.org/jira/browse/KAFKA-13654
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: kafka-streams, kip-required, needs-kip, streams
> Fix For: 3.3.0
>
>
> Extending KStream#process to use latest Processor API adopted here: 
> https://issues.apache.org/jira/browse/KAFKA-8410
> This new API allow typed returned KStream that will allow to chain results 
> from processors, becoming a new way to transform records with more control 
> over whats forwarded.
> KIP: https://cwiki.apache.org/confluence/x/yKbkCw



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13836) Improve KRaft broker heartbeat logic

2022-04-19 Thread dengziming (Jira)
dengziming created KAFKA-13836:
--

 Summary: Improve KRaft broker heartbeat logic
 Key: KAFKA-13836
 URL: https://issues.apache.org/jira/browse/KAFKA-13836
 Project: Kafka
  Issue Type: Improvement
Reporter: dengziming


# Don't advertise an offset to the controller until it has been published
 # only unfence a broker when it has seen it's own registration



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #873

2022-04-19 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-827: Expose logdirs total and usable space via Kafka API

2022-04-19 Thread Divij Vaidya
I have a minor suggestion below but overall KIP looks good to me to start a
vote.

*Reg#6* Would you consider replacing UNKNOWN_SPACE with UNSUPPORTED?
UNSUPPORTED tells the user explicitly that the value is missing due to
client/server version mismatch whereas with UNKNOWN_SPACE, the user is left
wondering whether it is a problem with underlying storage not providing
space information or something else.

Divij Vaidya



On Fri, Apr 15, 2022 at 3:40 PM Mickael Maison 
wrote:

> Hi Luke,
>
> 7. I've updated the KIP to clarify these sizes are in bytes.
>
> Thanks,
> Mickael
>
> On Fri, Apr 15, 2022 at 12:16 PM Luke Chen  wrote:
> >
> > Hi Mickael,
> >
> > Thanks for the KIP!
> > This is a good improvement.
> >
> > (3) +1 for not adding the number of files in the directory. Counting the
> > file numbers should be slow.
> > (7) Could you make the fields clear in `DescribeLogDirsResponse`, to
> > mention the returned number is size in Byte (or not?)
> >
> > Thank you.
> > Luke
> >
> > On Fri, Apr 15, 2022 at 5:27 PM Mickael Maison  >
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks for the feedback.
> > >
> > > 3. Yes that's right. Also the number of file descriptors is really not
> > > a property of log directories. Administrators typically tracked that
> > > count per process and for the whole operating system.
> > >
> > > 5. That's a good point, I've updated the KIP to mention sizes will be
> > > capped to Long.MAX_VALUE even if the actual storage is larger.
> > >
> > > 6. Brokers would never return UNKNOWN_SPACE. When new clients query
> > > older brokers via the admin API, the admin client will use
> > > UNKNOWN_SPACE to indicate these values weren't provided by brokers.
> > >
> > > Thanks,
> > > Mickael
> > >
> > > On Fri, Apr 8, 2022 at 5:00 PM Divij Vaidya 
> > > wrote:
> > > >
> > > > Thanks for replying. I still have a few lingering questions/comments.
> > > >
> > > > *Reg#1* Understood. I checked and the underlying system call is
> statvfs
> > > for
> > > > unix systems which should be ok to call here.
> > > > *Reg#2* Fair point. I checked again and yes, log.dir always means
> local
> > > > storage even when tiered storage is enabled.
> > > > *Reg#3* The rationale for adding these new (size) fields to the
> > > > `DescribeLogDirs` is to allow the administrator to monitor or perhaps
> > > take
> > > > automated action based on results. Doesn't monitoring the number of
> file
> > > > descriptors fall in the same category of use cases? I am assuming
> that we
> > > > want to add the size information in the API response because JVM
> makes it
> > > > possible to get this information in a platform agnostic manner which
> is
> > > not
> > > > true for open file descriptors, correct?
> > > > *Reg#4* Agree.
> > > > *New#5*: As an FYI, Java FileStore API breaks on large storage sizes.
> > > See:
> > > > https://bugs.openjdk.java.net/browse/JDK-8162520. ElasticSearch has
> been
> > > > hit by these limitations in the past. For JDK 11, you will probably
> have
> > > to
> > > > add defensive checks such as
> > > >
> > >
> https://github.com/opensearch-project/OpenSearch/blob/b74d71fb747cc2873d4c2ffae825944da4d06e1b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java#L148
> > > .
> > > > The documentation of the API mentioned in KIP will also be modified
> to
> > > > account for this edge case.
> > > > *New#6*: Can you please provide an example where the return for these
> > > APIs
> > > > would be UNKNOWN_SPACE? Doesn't JVM guarantee that this API will
> > > definitely
> > > > return results (else it throws an IOException)? I would propose that
> we
> > > get
> > > > rid of default since JVM guarantees that this would work on all
> > > platforms.
> > > > If it doesn't then it's a bug and should be uncovered via an
> exception.
> > > >
> > > > Also, I would like to volunteer to code review (of course, it would
> be
> > > > non-binding) your implementation once this KIP is approved.
> > > >
> > > > Regards,
> > > > Divij Vaidya
> > > >
> > > > On Fri, Apr 8, 2022 at 11:35 AM Mickael Maison <
> mickael.mai...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi Divij,
> > > > >
> > > > > Thanks for taking a look!
> > > > >
> > > > > 1. In order to retrieve the sizes, the plan is to use
> getTotalSpace()
> > > > > and getUsableSpace() from java.nio.file.FileStore. The
> implementations
> > > > > may vary depending on the filesystem but these calls typically
> don't
> > > > > depend on the size of storage but instead just return metadata the
> > > > > filesystem maintains.
> > > > > 2. I'm not an expert on KIP-405, so correct me if I'm wrong. As
> far as
> > > > > I understand brokers will still have local log dirs and remote
> volumes
> > > > > are not counted as log dirs. KIP-405 does not mention updating the
> > > > > DescribeLogDirs API. So I don't think this KIP needs to do anything
> > > > > special to be compatible with KIP-405. On the other hand, I wonder
> if
> > > > > KIP-405 should update 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #872

2022-04-19 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-813 Shared State Stores

2022-04-19 Thread Daan Gertis
Hey everyone!

Thank you for participating.

The KIP-813 vote has passed with:

binding +1s (John, Matthias, Bill)
non-binding +1s (Daan, Federico)

Cheers,
D.


From: John Roesler 
Date: Friday, 1 April 2022 at 15:54
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for the replies, Daan,

That all sounds good to me. I think standbys will probably come naturally, but 
we should make sure the implementation includes an integration test to make 
sure. Anyway, I just wanted to make sure we were on the same page.

Thanks again,
John

On Fri, Apr 1, 2022, at 08:16, Daan Gertis wrote:
> Hey John,
>
>
>   *   1. Am I right I’m thinking that there’s no way to enforce the
> stores are actually read-only, right? It seems like the StoreBuilder
> interface is too generic for that. If that’s true, I think it’s fine,
> but we should be sure the JavaDoc clearly states that other processors
> must not write into these stores (except for the one that feeds it).
>
> Yeah I couldn’t really find a way to limit it easily. We might be able
> to throw unsupported exceptions by wrapping the statestore, but that
> seems kind of brittle to do and feels a bit like a hack.
>
> Also, the function name clearly states it should be considered readonly.
>
>
>   *2. Are you planning for these stores to get standbys as well? I
> would think so, otherwise the desired purpose of standbys (eliminating
> restoration latency during failover) would not be served.
>
> Yeah I think standbys should be applicable here as well. But we get
> that by implementing these readonly statestores as regular ones right?
>
> Cheers,
> D.
>
>
> From: John Roesler 
> Date: Friday, 1 April 2022 at 04:01
> To: dev@kafka.apache.org 
> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
> Hi Daan,
>
> Thanks for the KIP!
>
> I just got caught up on the discussion. I just have a some small
> questions, and then I will be ready to vote.
>
> 1. Am I right I’m thinking that there’s no way to enforce the stores
> are actually read-only, right? It seems like the StoreBuilder interface
> is too generic for that. If that’s true, I think it’s fine, but we
> should be sure the JavaDoc clearly states that other processors must
> not write into these stores (except for the one that feeds it).
>
>  2. Are you planning for these stores to get standbys as well? I would
> think so, otherwise the desired purpose of standbys (eliminating
> restoration latency during failover) would not be served.
>
> Thanks,
> John
>
> On Mon, Mar 7, 2022, at 13:13, Matthias J. Sax wrote:
>> Thanks for updating the KIP. LGTM.
>>
>> I think we can start a vote.
>>
>>
>>>  I think this might provide issues if your processor is doing a projection 
>>> of the data.
>>
>> This is correct. It's a know issue:
>> https://issues.apache.org/jira/browse/KAFKA-7663
>>
>> Global-stores/KTables are designed to put the data into the store
>> _unmodified_.
>>
>>
>> -Matthias
>>
>> On 2/28/22 5:05 AM, Daan Gertis wrote:
>>> Updated the KIP to be more aligned with global state store function names.
>>>
>>> If I remember correctly during restore the processor will not be used 
>>> right? I think this might provide issues if your processor is doing a 
>>> projection of the data. Either way, I would not add that into this KIP 
>>> since it is a specific use-case pattern.
>>>
>>> Unless there is anything more to add or change, I would propose moving to a 
>>> vote?
>>>
>>> Cheers!
>>> D.
>>>
>>> From: Matthias J. Sax 
>>> Date: Friday, 18 February 2022 at 03:29
>>> To: dev@kafka.apache.org 
>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>> Thanks for updating the KIP!
>>>
>>> I am wondering if we would need two overloads of `addReadOnlyStateStore`
>>> one w/ and one w/o `TimestampExtractor` argument to effectively make it
>>> an "optional" parameter?
>>>
>>> Also wondering if we need to pass in a `String sourceName` and `String
>>> processorName` parameters (similar to `addGlobalStore()`?) instead if
>>> re-using the store name as currently proposed? -- In general I don't
>>> have a strong opinion either way, but it seems to introduce some API
>>> inconsistency if we don't follow the `addGlobalStore()` pattern?
>>>
>>>
 Another thing we were confronted with was the restoring of state when the 
 actual local storage is gone. For example, we host on K8s with ephemeral 
 pods, so there is no persisted storage between pod restarts. However, the 
 consumer group will be already been at the latest offset, preventing from 
 previous data to be restored within the new pod’s statestore.
>>>
>>> We have already code in-place in the runtime to do the right thing for
>>> this case (ie, via DSL source-table changelog optimization). We can
>>> re-use this part. It's nothing we need to discuss on the KIP, but we can
>>> discuss on the PR later.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 2/17/22 10:09 AM, Guozhang Wang wrote:
 Hi Daan,

 I think for the read-only 

[jira] [Resolved] (KAFKA-13832) Flaky test TopicCommandIntegrationTest.testAlterAssignment

2022-04-19 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13832.
---
Fix Version/s: 3.3.0
 Assignee: dengziming
   Resolution: Fixed

> Flaky test TopicCommandIntegrationTest.testAlterAssignment
> --
>
> Key: KAFKA-13832
> URL: https://issues.apache.org/jira/browse/KAFKA-13832
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
> Fix For: 3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13737) Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection

2022-04-19 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13737.
---
Resolution: Fixed

Resolved with PR: https://github.com/apache/kafka/pull/11681

> Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection
> 
>
> Key: KAFKA-13737
> URL: https://issues.apache.org/jira/browse/KAFKA-13737
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: dengziming
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: describeTopics
>   at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>   at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>   at 
> kafka.utils.TestUtils$.$anonfun$waitForLeaderToBecome$1(TestUtils.scala:1812)
>   at scala.util.Try$.apply(Try.scala:210)
>   at kafka.utils.TestUtils$.currentLeader$1(TestUtils.scala:1811)
>   at kafka.utils.TestUtils$.waitForLeaderToBecome(TestUtils.scala:1819)
>   at kafka.utils.TestUtils$.assertLeader(TestUtils.scala:1789)
>   at 
> kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection(LeaderElectionCommandTest.scala:172)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)