Re: [DISCUSS] KIP-819: Merge multiple KStreams in one operation

2022-03-28 Thread Chris Egerton
Hi all,

Java permits the overload. Simple test class to demonstrate:

```
public class Test {
private final String field;

public Test(String field) {
this.field = field;
}

public Test merge(Test that) {
return new Test("Single-arg merge: " + this.field + ", " +
that.field);
}

public Test merge(Test that, Test... those) {
String newField = "Varargs merge: " + this.field + ", " +
that.field;
for (Test test : those) newField += ", " + test.field;
return new Test(newField);
}

public static void main(String[] args) {
Test t1 = new Test("t1"), t2 = new Test("t2"), t3 = new Test("t3");
Test merge1 = t1.merge(t2), merge2 = t1.merge(t2, t3);
System.out.println(merge1.field); // Single-arg merge: t1, t2
System.out.println(merge2.field); // Varargs merge: t1, t2, t3
}
}
```

There's a great StackOverflow writeup on the subject [1], which explains
that during method resolution, priority is given to methods whose
signatures match the argument list without taking boxing/unboxing or
varargs into consideration:

> The first phase performs overload resolution without permitting boxing or
unboxing conversion, or the use of variable arity method invocation. If no
applicable method is found during this phase then processing continues to
the second phase.
> The second phase performs overload resolution while allowing boxing and
unboxing, but still precludes the use of variable arity method invocation.
If no applicable method is found during this phase then processing
continues to the third phase.
> The third phase allows overloading to be combined with variable arity
methods, boxing, and unboxing.

I'm curious if it's worth keeping a variant that accepts a Named parameter?
Might be tricky to accommodate since variadic arguments have to be last.

[1] - https://stackoverflow.com/a/48850722

Cheers,

Chris

On Mon, Mar 28, 2022 at 11:46 PM Matthias J. Sax  wrote:

> I think Java does not allow to have both overloads, because it would
> result in ambiguity?
>
> If you call `s1.merge(s2)` it's unclear which method you want to call.
>
>
> -Matthias
>
>
> On 3/28/22 7:20 AM, Nick Telford wrote:
> > Hi Matthias,
> >
> > How about instead of changing the signature of the existing method to
> > variadic, we simply add a new overload which takes variadic args:
> >
> > KStream merge(KStream first, KStream... rest);
> >
> > That way, we maintain both source *and* binary compatibility for the
> > existing method, and we can enforce that there is always at least one
> > stream (argument) being merged.
> >
> > I'm fine dropping the static methods. As you said, this is mostly all
> just
> > syntax sugar anyway, but I do think allowing multiple streams to be
> merged
> > together is a benefit. My motivation was that we generate diagrams for
> our
> > Topologies, and having several binary merges becomes quite messy when a
> > single n-ary merge is what you're really modelling.
> >
> > Regards,
> >
> > Nick
> >
> > On Thu, 24 Mar 2022 at 21:24, Matthias J. Sax  wrote:
> >
> >> Thanks for proposing this KIP.
> >>
> >> I feel a little bit torn by the idea. In general, we try to keep the
> >> surface area small, and only add APIs that delivery (significant) value.
> >>
> >> It seems the current proposal is more or less about syntactic sugar,
> >> what can still be valuable, but I am not really sure about it.
> >>
> >> I am also wondering, if we could use a variadic argument instead of a
> >> `Collection`:
> >>
> >>   KStream merge(KStream... streams);
> >>
> >> This way, we could just replace the existing method in a backward
> >> compatible way (well, source code compatible only) and thus not increase
> >> the surface area of the API while still achieving your goal?
> >>
> >> A `merge()` with zero argument would just be a no-op (same as for using
> >> `Collection` I assume?).
> >>
> >>
> >> For adding the static methods: It seems not to be a common pattern to
> >> me? I might be better not to add them and leave it to users to write a
> >> small helper method themselves if they have such a pattern?
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 1/31/22 7:35 AM, Nick Telford wrote:
> >>> Hi everyone,
> >>>
> >>> I'd like to discuss KIP 819:
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-819%3A+Merge+multiple+KStreams+in+one+operation
> >>>
> >>> This is a simple KIP that adds/modifies the KStream#merge API to enable
> >>> many streams to be merged in a single graph node.
> >>>
> >>> Regards,
> >>>
> >>> Nick Telford
> >>>
> >>
> >
>


Re: [DISCUSS] KIP-819: Merge multiple KStreams in one operation

2022-03-28 Thread Matthias J. Sax
I think Java does not allow to have both overloads, because it would 
result in ambiguity?


If you call `s1.merge(s2)` it's unclear which method you want to call.


-Matthias


On 3/28/22 7:20 AM, Nick Telford wrote:

Hi Matthias,

How about instead of changing the signature of the existing method to
variadic, we simply add a new overload which takes variadic args:

KStream merge(KStream first, KStream... rest);

That way, we maintain both source *and* binary compatibility for the
existing method, and we can enforce that there is always at least one
stream (argument) being merged.

I'm fine dropping the static methods. As you said, this is mostly all just
syntax sugar anyway, but I do think allowing multiple streams to be merged
together is a benefit. My motivation was that we generate diagrams for our
Topologies, and having several binary merges becomes quite messy when a
single n-ary merge is what you're really modelling.

Regards,

Nick

On Thu, 24 Mar 2022 at 21:24, Matthias J. Sax  wrote:


Thanks for proposing this KIP.

I feel a little bit torn by the idea. In general, we try to keep the
surface area small, and only add APIs that delivery (significant) value.

It seems the current proposal is more or less about syntactic sugar,
what can still be valuable, but I am not really sure about it.

I am also wondering, if we could use a variadic argument instead of a
`Collection`:

  KStream merge(KStream... streams);

This way, we could just replace the existing method in a backward
compatible way (well, source code compatible only) and thus not increase
the surface area of the API while still achieving your goal?

A `merge()` with zero argument would just be a no-op (same as for using
`Collection` I assume?).


For adding the static methods: It seems not to be a common pattern to
me? I might be better not to add them and leave it to users to write a
small helper method themselves if they have such a pattern?


-Matthias



On 1/31/22 7:35 AM, Nick Telford wrote:

Hi everyone,

I'd like to discuss KIP 819:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-819%3A+Merge+multiple+KStreams+in+one+operation


This is a simple KIP that adds/modifies the KStream#merge API to enable
many streams to be merged in a single graph node.

Regards,

Nick Telford







Jenkins build is back to stable : Kafka » Kafka Branch Builder » trunk #811

2022-03-28 Thread Apache Jenkins Server
See 




Subscribe to Kafka dev mailing list

2022-03-28 Thread ??????????
Subscribe to Kafka dev mailing list

[jira] [Created] (KAFKA-13774) AclAuthorizer should handle it a bit more gracefully if zookeeper.connect is null

2022-03-28 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13774:


 Summary: AclAuthorizer should handle it a bit more gracefully if 
zookeeper.connect is null
 Key: KAFKA-13774
 URL: https://issues.apache.org/jira/browse/KAFKA-13774
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-821: Connect Transforms support for nested structures

2022-03-28 Thread Chris Egerton
Hi Jorge,

Thanks for addressing my comments; the KIP looks up-to-date and pretty
readable now, and the rejected alternatives section does a great job of
outlining the discussion so far and providing context for anyone else who
might want to join in.

1. Thoughts on choice of delimiter:
- I like the optimization for simple cases, but I think the new proposal is
a little too restrictive. What if there's a field whose name contains all
of the permitted options (currently just ".", ",", and "/")?
- If we expand the set of permitted delimiters to allow for any
single-character string, configuration complexity will increase and
readability may decrease
- Also worth pointing out that there is some convention for doubling a
delimiter character as an escape mechanism with formats like CSV [1]
- Overall I think we may be approaching the saturation point for productive
discussion on delimiter syntax so I don't want to spend too much more of
your time on it. I think the one point I'd like to leave for now is that it
shouldn't be impossible to use this new feature for any field name, no
matter how convoluted. It's fine if edge cases introduce difficulty (such
as less-readable configurations), but it's not fine if they can't be
addressed at all.

2.
The configuration style where you define "transforms.field.style" in the
connector config, and then this applies to all SMTs for the connector, is
very interesting. However, it doesn't follow convention for existing SMTs.
Right now, if you want to configure an SMT, you define its name in the
connector config (for example, "transforms": "smt1"), and then define all
of the properties for that SMT in the connector config using a namespacing
mechanism specific to that SMT (for example, "transforms.smt1.prop1":
"val1"). That SMT then sees only the properties defined in that namespace,
with the prefix stripped (for example, "prop1": "val1") in its configure
[2] [3] method.
If we want to continue to follow this convention, then instead of
specifying "transforms.field.style" in a connector config, we would expect
users to configure "transforms..field.style", for each SMT that they
want to configure a field style for. This would require more work on the
part of the user, but would be simpler to reason about and easier to
implement.
If we want to explore an alternative where users can specify global
properties that apply to all transforms in a connector config, then the
semantics for this need to be defined in the KIP. This would have to
include whether this will apply only for the special case of the
"field.style" and possibly "field.separator" properties or if it would be
available more generally for other properties, whether it will apply only
for the SMTs outlined in the KIP or if the "field.style" and possibly
"field.separator" properties would also be passed into custom SMTs so that
they could choose to act on them if applicable, how edge cases like having
an SMT named "field" in your connector config would be handled, etc.
Either way, it might help to have an example in the KIP outlining how one
of the to-be-augmented SMTs can be configured with this new feature and a
before/after of how a record value would be transformed with that
configuration.

3. The docstring for the "transforms.field.style" property mentions that
the permitted values are "plain" and "nested", but then describes behavior
with a value of "root". Should that be "plain" instead?

4. The docstring for the "transforms.field.separator" property exclusively
mentions structs, but the feature is intended to work with maps as well.
Can we update it to reflect this?

References:

[1] - https://stackoverflow.com/a/17808731
[2] -
https://github.com/apache/kafka/blob/7243facb8d69a7252e6b9556b5eaee13e41bab7f/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java#L30
[3] -
https://github.com/apache/kafka/blob/7243facb8d69a7252e6b9556b5eaee13e41bab7f/clients/src/main/java/org/apache/kafka/common/Configurable.java#L26-L29

Cheers,

Chris

On Mon, Mar 28, 2022 at 1:32 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks, Chris!
>
> 1. I'd argue "this..field.child" could be harder to grasp than
> "this.field/child" + separator: "/".
> Even though this represents additional information, it follows a similar
> approach as the "Flatten#delimeter" configuration.
> I want to give the separator approach another try, so I have updated the
> KIP with the separator proposal, sticking to only 2 alternatives that
> should hopefully cover most scenarios.
>
> 2. Agree. KIP has been updated with this improvement.
>
> 3. You're right. I have updated this section accordingly.
>
> 4. Good catch! I've replaced it with `DropHeaders`.
>
> Looking forward to your feedback.
>
> Thanks,
> Jorge.
>
> On Wed, 9 Mar 2022 at 21:33, Chris Egerton 
> wrote:
>
> > Hi Jorge,
> >
> > Looking good! Got a few more thoughts.
> >
> > 1. Sorry to revisit this, but I think we may want to adopt a slightly
> > 

Jenkins build became unstable: Kafka » Kafka Branch Builder » trunk #810

2022-03-28 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-821: Connect Transforms support for nested structures

2022-03-28 Thread Jorge Esteban Quilcate Otoya
Thanks, Chris!

1. I'd argue "this..field.child" could be harder to grasp than
"this.field/child" + separator: "/".
Even though this represents additional information, it follows a similar
approach as the "Flatten#delimeter" configuration.
I want to give the separator approach another try, so I have updated the
KIP with the separator proposal, sticking to only 2 alternatives that
should hopefully cover most scenarios.

2. Agree. KIP has been updated with this improvement.

3. You're right. I have updated this section accordingly.

4. Good catch! I've replaced it with `DropHeaders`.

Looking forward to your feedback.

Thanks,
Jorge.

On Wed, 9 Mar 2022 at 21:33, Chris Egerton  wrote:

> Hi Jorge,
>
> Looking good! Got a few more thoughts.
>
> 1. Sorry to revisit this, but I think we may want to adopt a slightly
> different escape syntax style. Backslashes are great, but since they're
> already used by JSON, using them as an escape sequence in field notation
> would also lead to some pretty ugly connector configs. Anyone who's had to
> write regular expressions with backslashes in Java is probably already
> familiar with this: "this.is.not.very.readable". What do
> you think about using the dot character to escape itself? In other words,
> to access a single field named "this.field", instead of using the syntax
> "this\.field" (which in JSON would have to be expressed as "this\\.field"),
> we could use "this..field", and for a single field named "this\field",
> instead of using the syntax "this\\field" (or, in JSON, "thisfield"),
> we could use "this\field" (or, in JSON, "this\\field").
>
> 2. Could you flesh out the details on the new "field.style" property,
> including the type, default value, importance, and a preliminary docstring?
> See
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors#KIP618:ExactlyOnceSupportforSourceConnectors-Newproperties
> for an example.
>
> 3. Is the "Compatibility, Deprecation, and Migration Plan" section still
> accurate after the latest update? Seems like it's still written with the
> assumption that nested field syntax will be hardcoded or opt-in, which IIUC
> isn't the case anymore.
>
> 4. Nit: The "These SMTs do not require nested structure support" section
> mentions a "Drop" SMT. I think this may be referring to the Confluent Drop
> SMT, which isn't a part of Apache Kafka. Should we drop (heh) that SMT from
> the list? Or perhaps just replace it with "DropHeaders", which is currently
> missing from the list and shouldn't require any nested-field related
> updates?
>

> Cheers,
>
> Chris
>
> On Mon, Feb 28, 2022 at 2:12 PM Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
>
> > Thank you, Chris! and sorry for the delayed response.
> >
> > Please, find my comments below:
> >
> > On Mon, 14 Feb 2022 at 17:34, Chris Egerton  >
> > wrote:
> >
> > > Hi Jorge,
> > >
> > > Thanks for the KIP! I'd love to see support for nested fields added to
> > the
> > > out-of-the-box SMTs provided with Connect. Here are my initial
> thoughts:
> > >
> > > 1. I agree that there's a case to be made for expanding HoistField
> with a
> > > new config property for identifying a nested, to-be-hoisted field, but
> > the
> > > example in the KIP doesn't really demonstrate why this would be
> > valuable. I
> > > think it'd be helpful to expand the example to add other fields in
> order
> > to
> > > show how adding nested field support enables users to hoist a nested
> > field
> > > without dropping other fields from the value. Maybe something like
> this:
> > >
> > > source = nested.val
> > > field = line
> > >
> > > value (before):
> > > {
> > > "nested": {
> > > "val": 42,
> > > "other val": 96
> > > }
> > > }
> > >
> > > value (after):
> > > {
> > > "nested": {
> > > "line": {
> > > "val": 42,
> > > }
> > > "other val": 96
> > > }
> > > }
> > >
> > > 2. Nit: I think "source" is a little strange for the new HoistField
> > > property name. Maybe "hoisted" or "hoisted.field" would be more
> > > descriptive?
> > >
> > >
> > About 1. and 2.:
> > Agree. The example for this SMT is updated and have added the `hoisted`
> > configuration.
> >
> >
> > > 3. Is there a reasonable use case for expanding Flatten to be able to
> > > flatten specific fields? My understanding is that it's mostly useful
> for
> > > writing to systems like databases that don't support nested values and
> > > require everything to be a flat list of key-value pairs. Being able to
> > > flatten a nested field wouldn't provide any advantage for that use
> case.
> > > Are there other cases where it would?
> > >
> > > 4. I don't think we should unconditionally change the default delimiter
> > for
> > > Flatten. It's a backwards-incompatible, breaking 

Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.2 #10

2022-03-28 Thread Apache Jenkins Server
See 




Jenkins build is back to stable : Kafka » Kafka Branch Builder » trunk #809

2022-03-28 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-660: Pluggable ReplicaPlacer

2022-03-28 Thread Ryanne Dolan
Wondering about InvalidReplicationFactorException. Why would an
implementation throw this? Given the information passed to the method,
seems like this could only be thrown if there were obviously invalid
arguments, like a negative number or zero. Can we just guarantee such
invalid arguments aren't passed in?

Ryanne

On Sat, Mar 26, 2022, 8:51 AM Luke Chen  wrote:

> Hi Mickael,
>
> Thanks for the KIP!
> It's indeed a pain point for the Kafka admins.
>
> I have some comments:
> 1. Typo in motivation section: When administrators [when to] remove brokers
> from a cluster,
> 2. If different `replica.placer.class.name` configs are set in all
> controllers, I think only the config for  "active controller" will take
> effect, right?
> 3. Could you explain more about how the proposal fixes some scenarios you
> listed, ex: the new added broker case. How could we know the broker is new
> added? I guess it's by checking the broker load via some metrics
> dynamically, right?
>
>
> Thank you.
> Luke
>
> On Fri, Mar 18, 2022 at 10:30 AM Ryanne Dolan 
> wrote:
>
> > Thanks Mickael, this makes sense to me! I've been wanting something like
> > this in order to decommission a broker without new partitions getting
> > accidentally assigned to it.
> >
> > Ryanne
> >
> > On Thu, Mar 17, 2022, 5:56 AM Mickael Maison 
> > wrote:
> >
> > > Hi,
> > >
> > > I'd like to start a new discussion on KIP-660. I originally wrote this
> > > KIP in 2020 and the initial discussion
> > > (https://lists.apache.org/thread/xn7xyb74nyt281brto4x28r9rzxm4lp9)
> > > raised some concerns especially around KRaft (which did not exist at
> > > that time) and scalability.
> > >
> > > Since then, we got a new KRaft controller so I've been able to revisit
> > > this KIP. I kept the KIP number as it's essentially the same idea, but
> > > the proposal is significantly different:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-660%3A+Pluggable+ReplicaPlacer
> > >
> > > Please take a look and let me know if you have any feedback.
> > >
> > > Thanks,
> > > Mickael
> > >
> >
>


[jira] [Resolved] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-03-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13600.
---
Resolution: Fixed

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Priority: Major
> Fix For: 3.2.0
>
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-13736:
---

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-819: Merge multiple KStreams in one operation

2022-03-28 Thread Nick Telford
Hi Matthias,

How about instead of changing the signature of the existing method to
variadic, we simply add a new overload which takes variadic args:

KStream merge(KStream first, KStream... rest);

That way, we maintain both source *and* binary compatibility for the
existing method, and we can enforce that there is always at least one
stream (argument) being merged.

I'm fine dropping the static methods. As you said, this is mostly all just
syntax sugar anyway, but I do think allowing multiple streams to be merged
together is a benefit. My motivation was that we generate diagrams for our
Topologies, and having several binary merges becomes quite messy when a
single n-ary merge is what you're really modelling.

Regards,

Nick

On Thu, 24 Mar 2022 at 21:24, Matthias J. Sax  wrote:

> Thanks for proposing this KIP.
>
> I feel a little bit torn by the idea. In general, we try to keep the
> surface area small, and only add APIs that delivery (significant) value.
>
> It seems the current proposal is more or less about syntactic sugar,
> what can still be valuable, but I am not really sure about it.
>
> I am also wondering, if we could use a variadic argument instead of a
> `Collection`:
>
>  KStream merge(KStream... streams);
>
> This way, we could just replace the existing method in a backward
> compatible way (well, source code compatible only) and thus not increase
> the surface area of the API while still achieving your goal?
>
> A `merge()` with zero argument would just be a no-op (same as for using
> `Collection` I assume?).
>
>
> For adding the static methods: It seems not to be a common pattern to
> me? I might be better not to add them and leave it to users to write a
> small helper method themselves if they have such a pattern?
>
>
> -Matthias
>
>
>
> On 1/31/22 7:35 AM, Nick Telford wrote:
> > Hi everyone,
> >
> > I'd like to discuss KIP 819:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-819%3A+Merge+multiple+KStreams+in+one+operation
> >
> > This is a simple KIP that adds/modifies the KStream#merge API to enable
> > many streams to be merged in a single graph node.
> >
> > Regards,
> >
> > Nick Telford
> >
>


Kafka Streams Issue

2022-03-28 Thread Daan Gertis
Hi All,

We are experiencing some weird behaviour with our interactive query service 
implementation.
This is the flow we’ve implemented:


  1.  kafkaStreams.queryMetadataForKey(store, key, serializer) returns for 
activeHost HostInfo{host='localhost', port=8562}, and standbyHosts [] for the 
store and partition where the key would reside. We are not interested in 
standby hosts. Luckily, we have an active host which we can call.
  2.  We make an HTTP call to host localhost:8562, asking for the key there.
  3.  Inside the 8562 host, we retrieve the store by calling 
kafkaStreams.store(parameters), using parameters with staleStores set to false.
  4.  We call kafkaStreams.state().equals(RUNNING) to make sure we’re in the 
RUNNING state.
  5.  Now we call store.get(key) in order to retrieve the key from the store, 
if it has been stored there.
  6.  The get method on our store implementation calls the 
storeProvider.stores(storeName, storeType) method to iterate over all the 
stores available on the host.
  7.  The storeProvider is a WrappingStoreProvider, which calls 
storeProvider.stores(storeQueryParameters) for each 
StreamThreadStateStoreProvider it wraps (just one in our case).
  8.  As the logic inside that stores method finds that the StreamThread is in 
the RUNNING state, it retrieves the tasks based on 
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks().values() : 
streamThread.activeTasks(), which evaluates to false since we set staleStores 
to false in the params.
  9.  To our surprise, the streamThread.activeTasks() method returns an empty 
ArrayList, while the streamThread.allTasks().values() returns one StandbyTask 
for the store we’re looking for.
  10. As there appear to be no active tasks on this host for this store, we 
return the fabled “The state store, " + storeName + ", may have migrated to 
another instance.” InvalidStateStoreException.

This flow is quite tricky as the queryMetadataForKey returns an active host, 
which turns out to only have a standby task once queried.
I have executed the queryMetadataForKey method on the active host as well, once 
before calling kafkaStreams.store in step 3, and another time between step 4 
and 5. Each time the metadata returns the same, the host we’re on at that 
moment is the active host.

Could it be there is a difference between activeHost and activeTask?

For those also on the confluent community slack might recognize this message as 
it has been posted there by our CTO as well.

Cheers,
D.


[jira] [Created] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-28 Thread Tm Alkemade (Jira)
Tm Alkemade created KAFKA-13773:
---

 Summary: Data loss after recovery from crash due to full hard disk
 Key: KAFKA-13773
 URL: https://issues.apache.org/jira/browse/KAFKA-13773
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 2.8.1, 3.1.0
Reporter: Tm Alkemade
 Attachments: kafka-.zip, kafka-logfiles.zip

While doing some testing of Kafka on Kubernetes, the data disk for kafka filled 
up, which led to all 3 nodes crashing. I increased the disk size for all three 
nodes and started up kafka again (one by one, waiting for the previous node to 
become available before starting the next one). After a little while two out of 
three nodes had no data anymore.

According to the logs, the log cleaner kicked in and decided that the latest 
timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older than 
the 2 week limit specified on the topic.

 
{code:java}
2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
2022-03-28 12:17:19,753 INFO Deleted log 
/var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
 (kafka.log.LogSegment) [kafka-scheduler-0]
2022-03-28 12:17:19,754 INFO Deleted offset index 
/var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
 (kafka.log.LogSegment) [kafka-scheduler-0]
2022-03-28 12:17:19,754 INFO Deleted time index 
/var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
 (kafka.log.LogSegment) [kafka-scheduler-0]{code}
Using kafka-dump-log.sh I was able to determine that the greatest timestamp in 
that file (before deletion) was actually 1648460888636 ( 2022-03-28, 09:48:08 
UTC, which is today). However since this segment was the 'latest/current' 
segment much of the file is empty. The code that determines the last entry 
(TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this and just read the 
last position in the file, the file being mostly empty causes it to read 0 for 
that position.

The cleaner code seems to take this into account since 
UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
judging by the scaladoc, however in this case the check doesn't seem to do its 
job. Perhaps the detected highWatermark is wrong?

I've attached the logs and the zipped data directories (data files are over 3Gb 
in size when unzipped)

 

I've encountered this problem with both kafka 2.8.1 and 3.1.0.

I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13736.
---
Resolution: Fixed

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13735) Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives

2022-03-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13735.
---
Resolution: Fixed

> Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives
> ---
>
> Key: KAFKA-13735
> URL: https://issues.apache.org/jira/browse/KAFKA-13735
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11705/13/tests
> {code}
> Stacktrace
> java.lang.IllegalStateException: Channel closed too early
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$5(SocketServerTest.scala:1511)
>   at scala.Option.getOrElse(Option.scala:201)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1511)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1482)
>   at 
> kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives(SocketServerTest.scala:1393)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-28 Thread Sergio Daniel Troiano
hi guys again,

How does it work now? After voting, should I update/modify the KIP? So
basically is there any action I should do from my side?

Thanks!

On Sat, 26 Mar 2022 at 14:58, John Roesler  wrote:

> Thanks for the KIP, Sergio!
>
> I’m +1 (binding)
>
> Thanks,
> John
>
> On Sat, Mar 26, 2022, at 03:32, David Jacot wrote:
> > +1 (binding). Thanks for the KIP.
> >
> > Best,
> > David
> >
> > Le ven. 25 mars 2022 à 07:11, Luke Chen  a écrit :
> >
> >> Hi Sergio,
> >>
> >> Thanks for the KIP!
> >> +1(binding) from me.
> >>
> >> Thank you.
> >> Luke
> >>
> >> On Fri, Mar 25, 2022 at 1:40 PM Sergio Daniel Troiano
> >>  wrote:
> >>
> >> > Hi lads,
> >> >
> >> > I would like to start the vote on this:
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-824%3A+Allowing+dumping+segmentlogs+limiting+the+batches+in+the+output
> >> >
> >> > As an extra information we are already using the patch in our
> company, so
> >> > thanks to this patch and other extra script we did (maybe I will start
> >> > another KIP later) we started saving plenty of money.
> >> >
> >> > Best regards.
> >> > Sergio Troiano
> >> >
> >>
>


Re: [DISCUSS] KIP-651 - Support PEM format for SSL certificates and

2022-03-28 Thread Rajini Sivaram
Thanks everyone. Good to know that Java 18 will have password-less
keystore, so we can support this for other formats too in future. Looks
like we are in agreement that it is reasonable to relax the requirement for
PEM files now.

Regards,

Rajini

On Thu, Mar 24, 2022 at 2:41 PM David Jacot 
wrote:

> Hi all,
>
> Thanks Dejan for bringing this up. Relaxing this constraint seems
> reasonable to me. I guess we would have to relax it for the keystores
> at some point in the future as well (with Java 18).
>
> Let's wait a few days to see what others think about this.
>
> Best,
> David
>
> On Wed, Mar 23, 2022 at 8:46 PM Ismael Juma  wrote:
> >
> > Hi Rajini,
> >
> > On Mon, Mar 21, 2022 at 10:02 AM Rajini Sivaram  >
> > wrote:
> >
> > > For the background on the current implementation: We use Java's
> keystore
> > > loading for JKS/PKCS12 keystore files and these files require
> passwords. We
> > >
> >
> > In Java 18:
> >
> > "Passwordless keystores (a keystore with no password required to unlock
> it)
> > are useful when the keystore is stored in a secure location and is only
> > intended to store non-sensitive information, such as public X.509
> > certificates. With a passwordless PKCS12 keystore, certificates are not
> > encrypted and there is no Mac applied as an integrity check is not
> > necessary.
> >
> > Prior to this change, creating a passwordless PKCS12 keystore was
> > difficult, and required setting various security properties. Now, a
> > passwordless PKCS12 keystore can be created by simply specifying a null
> > password to the KeyStore::store(outStream, password) API. The keystore
> can
> > then be loaded with a null (or any) password with the KeyStore::load()
> API.
> >
> > Issue: JDK-8231107"
> >
> > https://seanjmullan.org/blog/2022/03/23/jdk18
> >
> > Ismael
>