[jira] [Created] (KAFKA-12789) Remove Stale comments for meta response handling logic

2021-05-14 Thread HaiyuanZhao (Jira)
HaiyuanZhao created KAFKA-12789:
---

 Summary: Remove Stale comments for meta response handling logic
 Key: KAFKA-12789
 URL: https://issues.apache.org/jira/browse/KAFKA-12789
 Project: Kafka
  Issue Type: Improvement
Reporter: HaiyuanZhao
Assignee: HaiyuanZhao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #128

2021-05-14 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-709: Extend OffsetFetch requests to accept multiple group ids

2021-05-14 Thread Sanjana Kaundinya
Hi Everyone,
I’ve begun working on this KIP now and found that another class will be needing 
public changes. I have updated the KIP to reflect this, so just wanted to 
update the dev list as well. You can find the updated KIP here: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258
Thanks,
Sanjana
On Jan 27, 2021, 4:18 AM -0800, Thomas Scott , wrote:
> Hi Magnus,
>
> Thanks for the review, I've added //moved and explanation as requested.
>
> Thanks
>
> Tom
>
>
> On Wed, Jan 27, 2021 at 12:05 PM Magnus Edenhill  wrote:
>
> > Hey Thomas,
> >
> > I'm late to the game.
> >
> > It looks like the "top level" ErrorCode moved from the top-level to the
> > Group array, which makes sense,
> > but it would be good if it was marked as // MOVED in the KIP and also a
> > note that top level errors that
> > are unrelated to the group will be returned as per-group errors.
> >
> >
> > Regards,
> > Magnus
> >
> >
> > Den tis 26 jan. 2021 kl 15:42 skrev Thomas Scott :
> >
> > > Thanks David I've updated it.
> > >
> > > On Tue, Jan 26, 2021 at 1:55 PM David Jacot  wrote:
> > >
> > > > Great. That answers my question!
> > > >
> > > > Thomas, I suggest adding a Related/Future Work section in the
> > > > KIP to link KIP-699 more explicitly.
> > > >
> > > > Thanks,
> > > > David
> > > >
> > > > On Tue, Jan 26, 2021 at 1:30 PM Thomas Scott  wrote:
> > > >
> > > > > Hi Mickael/David,
> > > > >
> > > > > I feel like the combination of these 2 KIPs gives the complete
> > > solution
> > > > > but they can be implemented independently. I have added a description
> > > and
> > > > > links to KIP-699 to KIP-709 to this effect.
> > > > >
> > > > > Thanks
> > > > >
> > > > > Tom
> > > > >
> > > > >
> > > > > On Tue, Jan 26, 2021 at 11:44 AM Mickael Maison <
> > > > mickael.mai...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Thomas,
> > > > > > Thanks, the KIP looks good.
> > > > > >
> > > > > > David,
> > > > > > I started working on exactly that a few weeks ago:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-699%3A+FindCoordinators
> > > > > > I hope to complete my draft and start a discussion later on this
> > > week.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > > On Tue, Jan 26, 2021 at 10:06 AM David Jacot 
> > > > > wrote:
> > > > > > >
> > > > > > > Hi Thomas,
> > > > > > >
> > > > > > > Thanks for the KIP. Overall, the KIP looks good to me.
> > > > > > >
> > > > > > > I have only one question: The FindCoordinator API only supports
> > > > > > > resolving one group id at the time. If we want to get the offsets
> > > for
> > > > > > > say N groups, that means that we have to first issue N
> > > > FindCoordinator
> > > > > > > requests, wait for the responses, group by coordinators, and then
> > > > > > > send a OffsetFetch request per coordinator. I wonder if we should
> > > > > > > also extend the FindCoordinator API to support resolving multiple
> > > > > > > groups as well. This would make the implementation in the admin
> > > > > > > client a bit easier and would ensure that we can handle multiple
> > > > > > > groups end-to-end. Have you thought about this?
> > > > > > >
> > > > > > > Best,
> > > > > > > David
> > > > > > >
> > > > > > > On Tue, Jan 26, 2021 at 10:13 AM Rajini Sivaram <
> > > > > rajinisiva...@gmail.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Thomas,
> > > > > > > >
> > > > > > > > Thanks for the KIP, this is a useful addition for admin use
> > > cases.
> > > > It
> > > > > > may
> > > > > > > > be worth starting the voting thread soon if we want to get this
> > > > into
> > > > > > 2.8.0.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > >
> > > > > > > > Rajini
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Jan 25, 2021 at 1:52 PM Thomas Scott  > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Ismael, that's a lot better. I've updated the KIP with
> > > > this
> > > > > > > > > behaviour instead.
> > > > > > > > >
> > > > > > > > > On Mon, Jan 25, 2021 at 11:42 AM Ismael Juma <
> > > ism...@juma.me.uk>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for the KIP, Thomas. One question below:
> > > > > > > > > >
> > > > > > > > > > Should an Admin client with this new functionality be used
> > > > > against
> > > > > > an
> > > > > > > > old
> > > > > > > > > > > broker that cannot handle these requests then the methods
> > > > will
> > > > > > throw
> > > > > > > > > > > UnsupportedVersionException as per the usual pattern.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Did we consider automatically falling back to the single
> > > group
> > > > id
> > > > > > > > request
> > > > > > > > > > if the more efficient one is not supported?
> > > > > > > > > >
> > > > > > > > > > Ismael
> > > > > > > > > >
> > > > > > > > > > On Mon, Jan 25, 2021 at 3:34 AM Thomas Scott <
> > > t...@confluent.io
> > > > >
> > > > > > wrote:
> > > > > > > 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #127

2021-05-14 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-12788) Improve KRaft replica placement

2021-05-14 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12788:


 Summary: Improve KRaft replica placement
 Key: KAFKA-12788
 URL: https://issues.apache.org/jira/browse/KAFKA-12788
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe


Implement the existing Kafka replica placement algorithm for KRaft.
This also means implementing rack awareness. Previously, we just chose
replicas randomly in a non-rack-aware fashion. Also, allow replicas to
be placed on fenced brokers if there are no other choices. This was
specified in KIP-631 but previously not implemented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12471) Implement createPartitions in KIP-500 mode

2021-05-14 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12471.
--
Fix Version/s: 3.0.0
   Resolution: Fixed

> Implement createPartitions in KIP-500 mode
> --
>
> Key: KAFKA-12471
> URL: https://issues.apache.org/jira/browse/KAFKA-12471
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12334) Add the KIP-631 metadata shell

2021-05-14 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12334.
--
Fix Version/s: 2.8.0
   Resolution: Fixed

Added in 2.8

> Add the KIP-631 metadata shell
> --
>
> Key: KAFKA-12334
> URL: https://issues.apache.org/jira/browse/KAFKA-12334
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> interactively examine the metadata stored in a KIP-500 cluster.
> It can read the metadata from the controllers directly, by connecting to
> them, or from a metadata snapshot on disk. In the former case, the
> quorum voters must be specified by passing the --controllers flag; in
> the latter case, the snapshot file should be specified via --snapshot.
> The metadata tool works by replaying the log and storing the state into
> in-memory nodes. These nodes are presented in a fashion similar to
> filesystem directories.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12275) KIP-500: Remove controllerOnly restriction from the DecommissionBroker API

2021-05-14 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12275.
--
Fix Version/s: 2.8.0
 Assignee: Colin McCabe
   Resolution: Fixed

DecomissionBroker was replaced by UnregisterBroker, and this is handled on 
brokers, not just controllers.

> KIP-500: Remove controllerOnly restriction from the DecommissionBroker API
> --
>
> Key: KAFKA-12275
> URL: https://issues.apache.org/jira/browse/KAFKA-12275
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Alok Nikhil
>Assignee: Colin McCabe
>Priority: Minor
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> We need to fix this to forward the request to the active controller.
> Considering this is a KIP-500 only API and we don't have an implementation 
> yet, we are terminating the connection instead for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12787) Configure and integrate controller snapshot with the RaftClient

2021-05-14 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12787:
--

 Summary: Configure and integrate controller snapshot with the 
RaftClient
 Key: KAFKA-12787
 URL: https://issues.apache.org/jira/browse/KAFKA-12787
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12754.

Fix Version/s: 3.0.0
   Resolution: Fixed

> TaskMetadata endOffsets does not update when the offsets are read
> -
>
> Key: KAFKA-12754
> URL: https://issues.apache.org/jira/browse/KAFKA-12754
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> The high water mark in StreamTask is not updated optimally. Also it would be 
> good to have the metadata offsets have a initial value of -1 instead of an 
> empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #126

2021-05-14 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-731: Record Rate Limiting for Kafka Connect

2021-05-14 Thread Ryanne Dolan
Hey y'all, I've expanded the scope of this KIP slightly to include a
pluggable interface, RateLimiter.

After implementing this a few different ways, it's clear that the
configuration story is actually simpler with a pluggable model.
Out-of-the-box, we have just two configuration properties to tweak:
record.rate.limit and record.batch.rate.limit (subj to change ofc). These
are provided by built-in RecordRateLimiter and RecordBatchRateLimiter impls.

>From there, additional custom RateLimiters can be enabled with whatever
configuration they need. This is essentially the same pattern taken with
MetricsReporters and others.

I had originally envisioned that the set of built-in limits would expand
over time, eg individual put/poll/commit/flush limits. However, these can
all be throttled adequately with the proposed API by limiting overall
record and batch thruput.

Please let me know what you think. The voting thread is open.

Ryanne

On Fri, Apr 9, 2021, 1:41 PM Ryanne Dolan  wrote:

> Hey y'all, I'd like to draw you attention to a new KIP:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-731%3A+Record+Rate+Limiting+for+Kafka+Connect
>
> Lemme know what you think. Thanks!
>
> Ryanne
>


Re: [VOTE] KIP-740: Use TaskId instead of String for the taskId field in TaskMetadata

2021-05-14 Thread John Roesler
Thanks for these updates, Sophie,

Unfortunately, I have some minor suggestions:

1. "Topic Group" is a vestigial term from the early days of
the codebase. We call a "topic group" a "subtopology" in the
public interface now (although "topic group" is still used
internally some places). For user-facing consistency, we
should also use "subtopologyId" in your proposal.

2. I'm wondering if it's really necessary to introduce this
interface at all. I think your motivation is to be able to
get the subtopologyId and partition via TaskMetadata, right?
Why not just add those methods to TaskMetadata? Stepping
back, the concept of metadata about an identifier is a bit
elaborate.

Sorry for thrashing what you were hoping would be a quick,
uncontroversial KIP.

Thanks for your consideration,
John

On Thu, 2021-05-13 at 19:35 -0700, Sophie Blee-Goldman
wrote:
> One last update: we will not actually remove the existing
> o.a.k.streams.processor.TaskId class, but only
> deprecate it, along with any methods that returned it (ie the getters on
> ProcessorContext and StateStoreContext)
> 
> Internally, everything will still be converted to use the new internal
> TaskId class, and public TaskIdMetadata interface,
> where appropriate.
> 
> 
> 
> On Thu, May 13, 2021 at 6:42 PM Sophie Blee-Goldman 
> wrote:
> 
> > Thanks all. I updated the KIP slightly since there is some ambiguity
> > around whether the existing TaskId class is
> > currently part of the public API or not. To settle the matter, I have
> > introduced a new public TaskId interface that
> > exposes the metadata, and moved the existing TaskId class to the internals
> > package. The KIP  has been
> > updated
> > with the proposed API changes.
> > 
> > @Guozhang Wang  : I decided to leave this new
> > TaskId interface in o.a.k.streams.processor since that's where the
> > TaskMetadata class is, along with the other related metadata classes (eg
> > ThreadMetadata). I do agree it makes
> > more sense for them to be under o.a.k.streams, but I'd rather leave them
> > together for now.
> > 
> > Please let me know if there are any concerns, or you want to redact your
> > vote :)
> > 
> > -Sophie
> > 
> > On Thu, May 13, 2021 at 3:11 PM Guozhang Wang  wrote:
> > 
> > > +1
> > > 
> > > On a hindsight, maybe TaskId should not really be in
> > > `org.apache.kafka.streams.processor` but rather just in `o.a.k.streams`,
> > > but maybe not worth pulling it up now :)
> > > 
> > > Guozhang
> > > 
> > > On Thu, May 13, 2021 at 1:58 PM Walker Carlson
> > >  wrote:
> > > 
> > > > +1 from me! (non-binding)
> > > > 
> > > > Walker
> > > > 
> > > > On Thu, May 13, 2021 at 1:53 PM Sophie Blee-Goldman
> > > >  wrote:
> > > > 
> > > > > Hey all,
> > > > > 
> > > > > I'm just going to take this KIP straight to a vote since it should be
> > > a
> > > > > trivial and uncontroversial change. Of course please raise any
> > > concerns
> > > > > should they come up, and I can take things to a DISCUSS thread.
> > > > > 
> > > > > The KIP is a simple change to move from String to TaskId for the
> > > taskID
> > > > > field of TaskMetadata.
> > > > > 
> > > > > KIP-740: Use TaskId instead of String for the taskId field in
> > > > TaskMetadata
> > > > > <
> > > > > 
> > > > 
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-740%3A+Use+TaskId+instead+of+String+for+the+taskId+field+in+TaskMetadata
> > > > > > 
> > > > > 
> > > > > Cheers,
> > > > > Sophie
> > > > > 
> > > > 
> > > 
> > > 
> > > --
> > > -- Guozhang
> > > 
> > 




Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-14 Thread Chris Egerton
Hi Tom,

Really interesting turn this has taken! Responses inline.

> I'm not quite sure I follow what you mean here. Can you explain? AFAICT it
doesn't apply in a cluster where all tasks are using fencible producers,
but maybe I'm missing something.

Imagine this (unlikely but possible) scenario:

1. A connector is created with N tasks and uses the
fencible-but-not-transactional producer
2. Task N (as in, the task with the highest ID) is allocated to a worker
that then becomes a zombie
3. The connector is reconfigured to use N-1 tasks, and this takes effect on
all non-zombie workers in the cluster
4. The connector is reconfigured to enable full-on exactly-once support
(i.e., use of the transactional producer)

At this point, we would need to know to fence out task N that's running on
the zombie worker. This is what is accomplished in the current design with
the task count records in the config topic; even if the number of tasks in
a connector is decreased, the leader would be aware of the old, higher task
count for that connector, and know to fence out that many tasks.

I was only noting this for completeness' sake; there's nothing about this
requirement that renders your proposal impossible or even significantly
more difficult. We'd just have to make sure to do the task count record
bookkeeping for connectors regardless of whether they're exactly-once or
not, so that if a connector has exactly-once switched on without a cluster
roll in the middle, we'd know exactly how many tasks to fence out before
bringing up that first round of transactional producers.

> That will be the case for the new transactional cluster anyway.

That's true, but we do go from three static ACLs (write/describe on a fixed
transactional ID, and idempotent write on a fixed cluster) to a dynamic
collection of ACLs. In especially large organizations where the people that
administrate Connect clusters aren't necessarily the same as the people
that create and manage connectors this might cause some friction. Still,
since there are benefits to all users (regardless of requirements for
exactly-once delivery guarantees) in the form of fencible producers that
would, in many if not all circumstances, reduce duplicate writes, it's not
out of the question to argue for this change.

I also toyed with the question of "If we're going to require these new ACLs
unconditionally, what's stopping us from just enabling fully-fledged
exactly-once source support by default?". It'd be pretty straightforward to
include zombie fencing for free with this change, for example. The only
remaining blocker seems to be that the connector needs direct write and
read access to the offsets topic that it uses.

Ultimately I think it may behoove us to err on the side of reducing the
breaking changes here for now and saving them for 4.0 (or some later major
release), but would be interested in thoughts from you and others.

> Gouzhang also has a (possible) use case for a fencing-only producer (
https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out
there, you should be able to get these semantics today by calling
initTransactions() and then just using the producer as normal (no
beginTransaction()/abortTransaction()/endTransaction()).

I tested this locally and was not met with success; transactional producers
do a check right now to ensure that any calls to "KafkaProducer::send"
occur within a transaction (see
https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
and
https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451).
Not a blocker, just noting that we'd have to do some legwork to make this
workable with the producer API.

> In light of that (and assuming you buy these arguments), I wonder how much
extra effort it would be to do for EOS-enabled clusters as part of this
KIP?

The extra effort wouldn't be negligible (expansion of the producer API,
more complexity in task count record logic, more thorough upgrade notes),
but ultimately I wouldn't object to the proposal because of the extra work
involved. What it really comes down to IMO is how aggressive we're willing
to be with the breaking changes we make for users. If a good argument can
be made for introducing new ACL requirements for every single connector
running on 3.0 and beyond, then I'd be happy to fold this into the KIP in
exchange for the ability to configure exactly-once support on per-connector
basis.

Really enjoying the fresh perspective you're bringing here, especially with
regards to the transactional producer internals and Kafka Streams use cases!

Cheers,

Chris

On Fri, May 14, 2021 at 10:07 AM Tom Bentley  wrote:

> Hi Chris,
>
> Thanks for the reply.
>
> "required"/"requested" sounds good to me. Likewise the pre-flight check and
> "PUT /{connectorTy

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-14 Thread Ryanne Dolan
Makes sense!

Ryanne

On Fri, May 14, 2021, 9:39 AM Nakamura  wrote:

> Hey Ryanne,
>
> I see what you're saying about serde blocking, but I think we should
> consider it out of scope for this patch.  Right now we've nailed down a
> couple of use cases where we can unambiguously say, "I can make progress
> now" or "I cannot make progress now", which makes it possible to offload to
> a different thread only if we are unable to make progress.  Extending this
> to CPU work like serde would mean always offloading, which would be a
> really big performance change.  It might be worth exploring anyway, but I'd
> rather keep this patch focused on improving ergonomics, rather than
> muddying the waters with evaluating performance very deeply.
>
> I think if we really do want to support serde or interceptors that do IO on
> the send path (which seems like an anti-pattern to me), we should consider
> making that a separate SIP, and probably also consider changing the API to
> use Futures (or CompletionStages).  But I would rather avoid scope creep,
> so that we have a better chance of fixing this part of the problem.
>
> Yes, I think some exceptions will move to being async instead of sync.
> They'll still be surfaced in the Future, so I'm not so confident that it
> would be that big a shock to users though.
>
> Best,
> Moses
>
> On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan 
> wrote:
>
> > re serialization, my concern is that serialization often accounts for a
> lot
> > of the cycles spent before returning the future. It's not blocking per
> se,
> > but it's the same effect from the caller's perspective.
> >
> > Moreover, serde impls often block themselves, e.g. when fetching schemas
> > from a registry. I suppose it's also possible to block in Interceptors
> > (e.g. writing audit events or metrics), which happens before serdes iiuc.
> > So any blocking in either of those plugins would block the send unless we
> > queue first.
> >
> > So I think we want to queue first and do everything off-thread when using
> > the new API, whatever that looks like. I just want to make sure we don't
> do
> > that for clients that wouldn't expect it.
> >
> > Another consideration is exception handling. If we queue right away,
> we'll
> > defer some exceptions that currently are thrown to the caller (before the
> > future is returned). In the new API, the send() wouldn't throw any
> > exceptions, and instead the future would fail. I think that might mean
> that
> > a new method signature is required.
> >
> > Ryanne
> >
> >
> > On Thu, May 13, 2021, 2:57 PM Nakamura  wrote:
> >
> > > Hey Ryanne,
> > >
> > > I agree we should add an additional constructor (or else an additional
> > > overload in KafkaProducer#send, but the new constructor would be easier
> > to
> > > understand) if we're targeting the "user provides the thread" approach.
> > >
> > > From looking at the code, I think we can keep record serialization on
> the
> > > user thread, if we consider that an important part of the semantics of
> > this
> > > method.  It doesn't seem like serialization depends on knowing the
> > cluster,
> > > I think it's incidental that it comes after the first "blocking"
> segment
> > in
> > > the method.
> > >
> > > Best,
> > > Moses
> > >
> > > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan 
> > > wrote:
> > >
> > > > Hey Moses, I like the direction here. My thinking is that a single
> > > > additional work queue, s.t. send() can enqueue and return, seems like
> > the
> > > > lightest touch. However, I don't think we can trivially process that
> > > queue
> > > > in an internal thread pool without subtly changing behavior for some
> > > users.
> > > > For example, users will often run send() in multiple threads in order
> > to
> > > > serialize faster, but that wouldn't work quite the same if there were
> > an
> > > > internal thread pool.
> > > >
> > > > For this reason I'm thinking we need to make sure any such changes
> are
> > > > opt-in. Maybe a new constructor with an additional ThreadFactory
> > > parameter.
> > > > That would at least clearly indicate that work will happen
> off-thread,
> > > and
> > > > would require opt-in for the new behavior.
> > > >
> > > > Under the hood, this ThreadFactory could be used to create the worker
> > > > thread that process queued sends, which could fan-out to
> per-partition
> > > > threads from there.
> > > >
> > > > So then you'd have two ways to send: the existing way, where serde
> and
> > > > interceptors and whatnot are executed on the calling thread, and the
> > new
> > > > way, which returns right away and uses an internal Executor. As you
> > point
> > > > out, the semantics would be identical in either case, and it would be
> > > very
> > > > easy for clients to switch.
> > > >
> > > > Ryanne
> > > >
> > > > On Thu, May 13, 2021, 9:00 AM Nakamura  wrote:
> > > >
> > > > > Hey Folks,
> > > > > I just posted a new proposal
> > > > > <
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/conflu

Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-14 Thread Nakamura
Hey Ryanne,

I see what you're saying about serde blocking, but I think we should
consider it out of scope for this patch.  Right now we've nailed down a
couple of use cases where we can unambiguously say, "I can make progress
now" or "I cannot make progress now", which makes it possible to offload to
a different thread only if we are unable to make progress.  Extending this
to CPU work like serde would mean always offloading, which would be a
really big performance change.  It might be worth exploring anyway, but I'd
rather keep this patch focused on improving ergonomics, rather than
muddying the waters with evaluating performance very deeply.

I think if we really do want to support serde or interceptors that do IO on
the send path (which seems like an anti-pattern to me), we should consider
making that a separate SIP, and probably also consider changing the API to
use Futures (or CompletionStages).  But I would rather avoid scope creep,
so that we have a better chance of fixing this part of the problem.

Yes, I think some exceptions will move to being async instead of sync.
They'll still be surfaced in the Future, so I'm not so confident that it
would be that big a shock to users though.

Best,
Moses

On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan  wrote:

> re serialization, my concern is that serialization often accounts for a lot
> of the cycles spent before returning the future. It's not blocking per se,
> but it's the same effect from the caller's perspective.
>
> Moreover, serde impls often block themselves, e.g. when fetching schemas
> from a registry. I suppose it's also possible to block in Interceptors
> (e.g. writing audit events or metrics), which happens before serdes iiuc.
> So any blocking in either of those plugins would block the send unless we
> queue first.
>
> So I think we want to queue first and do everything off-thread when using
> the new API, whatever that looks like. I just want to make sure we don't do
> that for clients that wouldn't expect it.
>
> Another consideration is exception handling. If we queue right away, we'll
> defer some exceptions that currently are thrown to the caller (before the
> future is returned). In the new API, the send() wouldn't throw any
> exceptions, and instead the future would fail. I think that might mean that
> a new method signature is required.
>
> Ryanne
>
>
> On Thu, May 13, 2021, 2:57 PM Nakamura  wrote:
>
> > Hey Ryanne,
> >
> > I agree we should add an additional constructor (or else an additional
> > overload in KafkaProducer#send, but the new constructor would be easier
> to
> > understand) if we're targeting the "user provides the thread" approach.
> >
> > From looking at the code, I think we can keep record serialization on the
> > user thread, if we consider that an important part of the semantics of
> this
> > method.  It doesn't seem like serialization depends on knowing the
> cluster,
> > I think it's incidental that it comes after the first "blocking" segment
> in
> > the method.
> >
> > Best,
> > Moses
> >
> > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan 
> > wrote:
> >
> > > Hey Moses, I like the direction here. My thinking is that a single
> > > additional work queue, s.t. send() can enqueue and return, seems like
> the
> > > lightest touch. However, I don't think we can trivially process that
> > queue
> > > in an internal thread pool without subtly changing behavior for some
> > users.
> > > For example, users will often run send() in multiple threads in order
> to
> > > serialize faster, but that wouldn't work quite the same if there were
> an
> > > internal thread pool.
> > >
> > > For this reason I'm thinking we need to make sure any such changes are
> > > opt-in. Maybe a new constructor with an additional ThreadFactory
> > parameter.
> > > That would at least clearly indicate that work will happen off-thread,
> > and
> > > would require opt-in for the new behavior.
> > >
> > > Under the hood, this ThreadFactory could be used to create the worker
> > > thread that process queued sends, which could fan-out to per-partition
> > > threads from there.
> > >
> > > So then you'd have two ways to send: the existing way, where serde and
> > > interceptors and whatnot are executed on the calling thread, and the
> new
> > > way, which returns right away and uses an internal Executor. As you
> point
> > > out, the semantics would be identical in either case, and it would be
> > very
> > > easy for clients to switch.
> > >
> > > Ryanne
> > >
> > > On Thu, May 13, 2021, 9:00 AM Nakamura  wrote:
> > >
> > > > Hey Folks,
> > > > I just posted a new proposal
> > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
> > > > >
> > > > in the wiki.  I think we have an opportunity to improve the
> > > > KafkaProducer#send user experience.  It would certainly make our
> lives
> > > > easier.  Please take a look!  There are two subproblems that I could
> > use
> > > > guidance on, so I would apprec

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-14 Thread Tom Bentley
Hi Chris,

Thanks for the reply.

"required"/"requested" sounds good to me. Likewise the pre-flight check and
"PUT /{connectorType}/config/validate".

The other half is we'd still need to
> track the number of tasks for that connector that would need to be fenced
> out if/when exactly-once for it were switched on.
>

I'm not quite sure I follow what you mean here. Can you explain? AFAICT it
doesn't apply in a cluster where all tasks are using fencible producers,
but maybe I'm missing something.

If we had the
> intermediate producer you describe at our disposal, and it were in use by
> every running source task for a given connector, we could probably enable
> users to toggle exactly-once on a per-connector basis, but it would also
> require new ACLs for all connectors.
>

That will be the case for the new transactional cluster anyway.

I think there is value to supporting connectors that don't use full-blown
transactions in an exactly-once cluster, because the overhead in a fencing
producer should be similar to an idempotent producer (which IIRC is about
3% above a non-idempotent producer). That's because we only need to make a
single InitProducerIdRequest, and thereafter the epoch check is tiny.

If that's right then many people would then be able to use a single cluster
for both exactly once and non-exactly once connectors (i.e. it would get
rid of the performance cost of running a non-EOS connector in an
exactly-once cluster). Only people who cared about the ~3% would need to
run "old-style" clusters using unfenced producers.

Gouzhang also has a (possible) use case for a fencing-only producer (
https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out
there, you should be able to get these semantics today by calling
initTransactions() and then just using the producer as normal (no
beginTransaction()/abortTransaction()/endTransaction()).

In light of that (and assuming you buy these arguments), I wonder how much
extra effort it would be to do for EOS-enabled clusters as part of this
KIP?

Thanks again,

Tom

On Fri, May 14, 2021 at 2:14 AM Chris Egerton 
wrote:

> Hi Tom,
>
> I'm fine with an implicit mapping of connector-provided null to
> user-exposed UNKNOWN, if the design continues down that overall path.
>
> Allowing users to assert that a connector should support exactly-once
> sounds reasonable; it's similar to the pre-flight checks we already do for
> connector configurations such as invoking "Connector::validate" and
> ensuring that all of the referenced SMTs, Predicates, and Converter classes
> are present on the worker. In fact, I wonder if that's how we could
> implement it--as a preflight check. That way, Connector and Task instances
> won't even have the chance to fail; if the user states a requirement for
> exactly-once support but their connector configuration doesn't meet that
> requirement, we can fail the connector creation/reconfiguration request
> before even writing the new config to the config topic. We could also add
> this support to the "PUT /{connectorType}/config/validate" endpoint so that
> users could test exactly-once support for various configurations without
> having to actually create or reconfigure a connector. We could still fail
> tasks on startup if something slipped by (possibly due to connector
> upgrade) but it'd make the UX a bit smoother in most cases to fail faster.
>
> Since a possible use of the property is to allow future users to control
> exactly-once support on a per-connector basis, I wonder whether a binary
> property is sufficient here. Even if a connector doesn't support
> exactly-once, there could still be benefits to using a transactional
> producer with rounds of zombie fencing; for example, preventing duplicate
> task instances from producing data, which could be leveraged to provide
> at-most-once delivery guarantees. In that case, we'd want a way to signal
> to Connect that the framework should do everything it does to provide
> exactly-once source support, but not make the assertion on the connector
> config, and we'd end up providing three possibilities to users: required,
> best-effort, and disabled. It sounds like right now what we're proposing is
> that we expose only the first two and don't allow users to actually disable
> exactly-once support on a per-connector basis, but want to leave room for
> the third option in the future. With that in mind, "required/not_required"
> might not be the best fit. Perhaps "required"/"requested" for now, with
> "disabled" as the value that could be implemented later?
>
> RE: "Is the problem here simply that the zombie fencing provided by the
> producer is only available when using transactions, and therefore having a
> non-transactional producer in the cluster poses a risk of a zombie not
> being fenced?"--that's half of it. The other half is we'd still need to
> track the number of tasks for that connector that would need to be fenced
> out if/when exactly-once for it were switched on. 

[jira] [Created] (KAFKA-12786) Getting SslTransportLayerTest error

2021-05-14 Thread Sibelle (Jira)
Sibelle created KAFKA-12786:
---

 Summary: Getting SslTransportLayerTest error 
 Key: KAFKA-12786
 URL: https://issues.apache.org/jira/browse/KAFKA-12786
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
 Environment: Ububtu 20.04
Reporter: Sibelle
 Attachments: Error.png

SaslAuthenticatorTest > testRepeatedValidSaslPlainOverSsl() PASSED
org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(Args)[1]
 failed, log available in 
/kafka/clients/build/reports/testOutput/org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(Args)[1].test.stdout

SslTransportLayerTest > [1] tlsProtocol=TLSv1.2, useInlinePem=false FAILED
org.opentest4j.AssertionFailedError: Condition not met within timeout 
15000. Metric not updated failed-authentication-total expected:<1.0> but 
was:<0.0> ==> expected:  but was: 
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:320)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:317)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:301)
at 
org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:196)
at 
org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:155)
at 
org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedTLSVersion(SslTransportLayerTest.java:644)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12784) ssl kafka failed

2021-05-14 Thread Trofimov (Jira)
Trofimov created KAFKA-12784:


 Summary: ssl kafka failed
 Key: KAFKA-12784
 URL: https://issues.apache.org/jira/browse/KAFKA-12784
 Project: Kafka
  Issue Type: Task
  Components: config, consumer, KafkaConnect
Affects Versions: 2.8.0
Reporter: Trofimov
 Fix For: 2.8.0


*kafka version:* kafka_2.13- 2.8.0

i have problem with ssl kafka. I can't figure out how 
ssl.endpoint.identification.algorithm = works because everything works fine for 
me if this parameter is empty.

 

If I put it https, I will have problems "_no subject alternative dns name 
matching_" with brokers.

 

*My dns name 1 server:*

 

[root@zeus1 /home/trofimov-im]#  nslookup IP_ADDR

IP_ADDR.in-addr.arpa  name = zeus1.bbk.strf.ru.

 

I removed unnecessary

*cert in truststore:*

 

Keystore type: jks
Keystore provider: SUN

Your keystore contains 7 entries

Alias name: caroot
Creation date: May 11, 2021
Entry type: trustedCertEntry

Owner: CN=Enterprise CA 2, DC=bbk, DC=strf, DC=ru
Issuer: CN=Root CA, O=bbk, C=RU

 

***
***


Alias name: zeus1.cert
Creation date: May 11, 2021
Entry type: PrivateKeyEntry
Certificate chain length: 1
Certificate[1]:
Owner: CN=zeus1.bbk.strf.ru, OU=SCS, O=BBK of Russia, L=Moscow, ST=Moscow, C=RU
Issuer: CN=Enterprise CA 2, DC=bbk, DC=strf, DC=ru
Serial number: 1d0007b167a6fd474142f6b79f0007b167
Valid from: Tue Apr 27 19:33:52 MSK 2021 until: Mon Nov 20 14:19:00 MSK 2023
Certificate fingerprints:
 MD5: 85:E5:4F:30:A6:A1:0E:A0:8B:7E:70:1C:2B:01:65:BA
 SHA1: 84:20:E8:0E:8E:24:EB:E4:93:92:7B:D1:61:3B:75:A9:D8:83:12:DE
 SHA256: 
E6:3D:4E:BD:93:22:B5:4E:28:5A:78:F6:B8:53:1B:BF:6C:39:3D:FC:EB:CF:F8:62:FC:DA:9B:BE:59:4E:F6:EE
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3


#8: ObjectId: 2.5.29.17 Criticality=false
SubjectAlternativeName [
 DNSName: scs-kafka.bbk.strf.ru
 DNSName: *.scs-kafka.bbk.strf.ru
 DNSName: scs-kafka
 DNSName: *.scs-kafka
 DNSName: zeus1.bbk.strf.ru
 DNSName: *.zeus1.bbk.strf.ru
 DNSName: zeus1
 DNSName: *.zeus1
]

 

***
***


Alias name: zeus2.cert
Creation date: May 11, 2021
Entry type: PrivateKeyEntry
Certificate chain length: 1
Certificate[1]:
Owner: CN=zeus2.bbk.strf.ru, OU=SCS, O=BBK of Russia, L=Moscow, ST=Moscow, C=RU
Issuer: CN=Enterprise CA 2, DC=bbk, DC=strf, DC=ru
Serial number: 1d0007b169e5e4f88b66d2e1ce0007b169
Valid from: Tue Apr 27 19:35:28 MSK 2021 until: Mon Nov 20 14:19:00 MSK 2023
Certificate fingerprints:
 MD5: 98:19:39:A9:DF:73:61:EB:17:30:BB:40:75:16:CE:0A
 SHA1: 81:0E:77:60:31:77:FC:5A:5C:E3:5F:45:F5:97:C6:84:F0:7B:DB:B5
 SHA256: 
8D:89:2D:B0:AA:9B:8E:95:D0:54:42:E9:E2:6D:67:FC:7A:6E:F4:50:58:76:F4:F7:0E:F5:D6:F7:A8:C1:5D:51
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

 

#8: ObjectId: 2.5.29.17 Criticality=false
SubjectAlternativeName [
 DNSName: scs-kafka.bbk.strf.ru
 DNSName: *.scs-kafka.bbk.strf.ru
 DNSName: scs-kafka
 DNSName: *.scs-kafka
 DNSName: zeus2.bbk.strf.ru
 DNSName: *.zeus2.bbk.strf.ru
 DNSName: zeus2
 DNSName: *.zeus2
]

 

***
***

 

*keystore is the same*

*The configuration is like this:* 

 

ssl.keystore.location=/home/kafka/kafka.server.keystore.jks

ssl.keystore.password=password

ssl.key.password= password

 

ssl.truststore.location=/home/kafka/kafka.server.truststore.jks

ssl.truststore.password= password

 

ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

ssl.keystore.type=JKS

ssl.truststore.type=JKS

 

security.inter.broker.protocol=SSL

ssl.client.auth=required

ssl.endpoint.identification.algorithm=

 

*What's wrong, where to dig?*

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #125

2021-05-14 Thread Apache Jenkins Server
See