[jira] [Created] (KAFKA-15678) [Tiered Storage] Stall remote reads with long-spanning transactions

2023-10-24 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-15678:
-

 Summary: [Tiered Storage] Stall remote reads with long-spanning 
transactions
 Key: KAFKA-15678
 URL: https://issues.apache.org/jira/browse/KAFKA-15678
 Project: Kafka
  Issue Type: Bug
  Components: Tiered-Storage
Affects Versions: 3.6.0
Reporter: Alexandre Dupriez


Hi team,

I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return have 
an asymptotic complexity proportional to the number of segments in the log. 
That is not a problem with local storage since the constant factor to traverse 
the files is small enough, but that is not the case with a remote storage which 
exhibits higher read latency. An aggravating factor was the lock contention in 
the remote index cache which was then mitigated by KAFKA-15084. But 
unfortunately, despite the improvements observed without said contention, the 
algorithmic complexity of the current method used to compute uncommitted 
records can always defeat any optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the transaction 
spans.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15490) Invalid path provided to the log failure channel upon I/O error when writing broker metadata checkpoint

2023-09-23 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-15490:
-

 Summary: Invalid path provided to the log failure channel upon I/O 
error when writing broker metadata checkpoint
 Key: KAFKA-15490
 URL: https://issues.apache.org/jira/browse/KAFKA-15490
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Alexandre Dupriez


There is a small bug/typo in the handling of I/O error when writing broker 
metadata checkpoint in {{{}KafkaServer{}}}. The path provided to the log dir 
failure channel is the full path of the checkpoint file whereas only the log 
directory is expected 
([source|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/server/KafkaServer.scala#L958C8-L961C8]).

 
{code:java}
case e: IOException =>
          val dirPath = checkpoint.file.getAbsolutePath
          logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while 
writing meta.properties to $dirPath", e){code}
 

As a result, after an {{IOException}} is captured and enqueued in the log dir 
failure channel 

{{}}
{code:java}

{code}
{{[2023-09-22 17:07:32,052] ERROR Error while writing meta.properties to 
/meta.properties (kafka.server.LogDirFailureChannel) 
java.io.IOException}}

 

The log dir failure handler cannot lookup the log directory:

{{}}
{code:java}

{code}
{{[2023-09-22 17:07:32,053] ERROR [LogDirFailureHandler]: Error due to 
(kafka.server.ReplicaManager$LogDirFailureHandler) 
org.apache.kafka.common.errors.LogDirNotFoundException: Log dir 
/meta.properties is not found in the config.}}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15486) Include NIO exceptions as I/O exceptions to be part of disk failure handling

2023-09-22 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-15486:
-

 Summary: Include NIO exceptions as I/O exceptions to be part of 
disk failure handling
 Key: KAFKA-15486
 URL: https://issues.apache.org/jira/browse/KAFKA-15486
 Project: Kafka
  Issue Type: Improvement
  Components: core, jbod
Reporter: Alexandre Dupriez


Currently, Apache Kafka offers the ability to detect and capture I/O errors 
when accessing the file system via the standard {{IOException}} from the JDK. 
There are cases however, where I/O errors are only reported via exceptions such 
as {{{}BufferOverflowException{}}}, without associated {{IOException}} on the 
produce or read path, so that the data volume is not detected as unhealthy and 
not included in the list of offline directories.

Specifically, we faced the following scenario on a broker:
 * The data volume hosting a log directory became saturated.
 * As expected, {{IOException}} were generated on the read/write path.
 * The log directory was set as offline and since it was the only log directory 
configured on the broker, Kafka automatically shut down.
 * Additional space was added to the data volume.
 * Kafka was then restarted.
 * No more {{IOException}} occurred, however {{BufferOverflowException}} *[*]* 
were raised while trying to delete log segments in oder to honour the retention 
settings of a topic. The log directory was not moved to offline and the 
exceptions kept re-occurring indefinitely.

The retention settings were therefore not applied in this case. The mitigation 
consisted in restarting Kafka.

It may be worth considering adding {{BufferOverflowException}} and 
{{BufferUnderflowException}} (and any other related exception from the JDK NIO 
library which surfaces an I/O error) to the current {{IOException}} as a proxy 
of storage I/O failure, although there may be known unintended consequences in 
doing so which is the reason they were not added already, or, it may be too 
marginal of an impact to modify the main I/O failure handing path to risk 
exposing it to such unknown unintended consequences.

*[*]*
{code:java}
java.nio.BufferOverflowException     at 
java.base/java.nio.Buffer.nextPutIndex(Buffer.java:674)     at 
java.base/java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:882)     at 
kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)     at 
kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)     at 
kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:506)     at 
kafka.log.Log.$anonfun$roll$8(Log.scala:2066)     at 
kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:2066)     at 
scala.Option.foreach(Option.scala:437)     at 
kafka.log.Log.$anonfun$roll$2(Log.scala:2066)     at 
kafka.log.Log.roll(Log.scala:2482)     at 
kafka.log.Log.maybeRoll(Log.scala:2017)     at 
kafka.log.Log.append(Log.scala:1292)     at 
kafka.log.Log.appendAsFollower(Log.scala:1155)     at 
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1023)
     at 
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1030)
     at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:178)
     at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:356)
     at scala.Option.foreach(Option.scala:437)     at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:345)
     at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:344)
     at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
     at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
     at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
     at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
     at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:344)
     at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:141)
     at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:140)
     at scala.Option.foreach(Option.scala:437)     at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:140)  
   at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:123)     
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{code}
 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in RLM

2023-05-30 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-15038:
-

 Summary: Use topic id/name mapping from the Metadata cache in RLM
 Key: KAFKA-15038
 URL: https://issues.apache.org/jira/browse/KAFKA-15038
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alexandre Dupriez
Assignee: Alexandre Dupriez


Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
topic id 
[[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
 using the information provided during leadership changes, and removing the 
mapping upon receiving the notification of partition stopped.

It should be possible to re-use the mapping in a broker's metadata cache, 
removing the need for the RLM to build and update a local cache thereby 
duplicating the information in the metadata cache. It also allows to preserve a 
single source of authority regarding the association between topic names and 
ids.

[1] 
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-858: Handle JBOD broker disk failure in KRaft

2023-05-25 Thread Alexandre Dupriez
Hi, Igor,

Thanks for the excellent, thorough and very comprehensive KIP.

Although not directly in scope of the KIP, but related to it, I would
have the following question about a potential future work on disk
degradation.

Today, what characterises as a disk failure in Kafka is an I/O
exception surfaced by the JDK libraries. There are other types of
(more or less) soft failures where a disk (or the system behind its
abstraction) remains available, but experiences degradation, typically
in the form of elevated I/O latency. Currently, Kafka is not made
aware of the “health” of a disk. It may be useful to let Kafka know
about the QoS of its disks so that it can take actions which could
improve availability, e.g. via leader movements.

The KIP builds upon the existing concepts of online and offline states
for log directories, and the propagation of a disk failure via the
broker heartbeat and registration relies on the offline(d) directories
list. I wonder if it could make sense to extend the definition of
state of a log directory beyond online/offline to be able to refer to
disk degradation. In which case, the new fields added to the broker
heartbeat and registration requests may be the place where this
alternative state can also be conveyed. Perhaps the changes to the
RPCs could be designed to accommodate this new type of semantic in the
future.

What do you think?

Thanks,
Alexandre

Le mer. 26 avr. 2023 à 14:05, Igor Soarez  a écrit :
>
> Thank you for another review Ziming, much appreciated!
>
> 1. and 2. You are correct, it would be a big and perhaps strange difference.
> Since our last exchange of emails, the proposal has changed and now it
> does follow your suggestion to bump metadata.version.
> The KIP mentions it under "Compatibility, Deprecation, and Migration Plan".
>
> 3. I tried to describe this under "Controller", under the heading
> "Handling replica assignments", but perhaps it could be improved.
> Let me know what you think.
>
> Best,
>
> --
> Igor
>


Re: [DISCUSS] KIP-926: introducing acks=min.insync.replicas config

2023-05-11 Thread Alexandre Dupriez
Hi, Luke,

Thanks for your reply.

102. Whether such a replica could become a leader depends on what the
end-user wants to use it for and what tradeoffs they wish to make down
the line.

There are cases, for instance with heterogeneous or interregional
networks, where the difference in latency between subsets of brokers
can be high enough for the "slow replicas" to have a detrimental
impact on the ISR traffic they take part in. This can justify
permanently segregating them from ISR traffic by design. And, an
end-user could still prefer to have these "slow replicas" versus
alternative approaches such as mirroring for the benefits they can
bring, for instance: a) they belong to the same cluster with no added
admin and ops, b) benefit from a direct, simpler replication path, c)
require less infrastructure than a mirrored solution, d) could become
unclean leaders for failovers under disaster scenarios such as a
regional service outages.

Thanks,
Alexandre

Le jeu. 11 mai 2023 à 14:57, Haruki Okada  a écrit :
>
> Hi, Luke.
>
> Though this proposal definitely looks interesting, as others pointed out,
> the leader election implementation would be the hard part.
>
> And I think even LEO-based-election is not safe, which could cause silent
> committed-data loss easily.
>
> Let's say we have replicas A,B,C and A is the leader initially, and
> min.insync.replicas = 2.
>
> - 1. Initial
> * A(leo=0), B(leo=0), C(leo=0)
> - 2. Produce a message to A
> * A(leo=1), B(leo=0), C(leo=0)
> - 3. Another producer produces a message to A (i.e. as the different batch)
> * A(leo=2), B(leo=0), C(leo=0)
> - 4. C replicates the first batch. offset=1 is committed (by
> acks=min.insync.replicas)
> * A(leo=2), B(leo=0), C(leo=1)
> - 5. A loses ZK session (or broker session timeout in KRaft)
> - 6. Controller (regardless ZK/KRaft) doesn't store LEO in itself, so it
> needs to interact with each replica. It detects C has the largest LEO and
> decided to elect C as the new leader
> - 7. Before leader-election is performed, B replicates offset=1,2 from A.
> offset=2 is committed
> * This is possible because even if A lost ZK session, A could handle
> fetch requests for a while.
> - 8. Controller elects C as the new leader. B truncates its offset.
> offset=2 is lost silently.
>
> I have a feeling that we need quorum-based data replication? as Divij
> pointed out.
>
>
> 2023年5月11日(木) 22:33 David Jacot :
>
> > Hi Luke,
> >
> > > Yes, on second thought, I think the new leader election is required to
> > work
> > for this new acks option. I'll think about it and open another KIP for it.
> >
> > It can't be in another KIP as it is required for your proposal to work.
> > This is also an important part to discuss as it requires the controller to
> > do more operations on leader changes.
> >
> > Cheers,
> > David
> >
> > On Thu, May 11, 2023 at 2:44 PM Luke Chen  wrote:
> >
> > > Hi Ismael,
> > > Yes, on second thought, I think the new leader election is required to
> > work
> > > for this new acks option. I'll think about it and open another KIP for
> > it.
> > >
> > > Hi Divij,
> > > Yes, I agree with all of them. I'll think about it and let you know how
> > we
> > > can work together.
> > >
> > > Hi Alexandre,
> > > > 100. The KIP makes one statement which may be considered critical:
> > > "Note that in acks=min.insync.replicas case, the slow follower might
> > > be easier to become out of sync than acks=all.". Would you have some
> > > data on that behaviour when using the new ack semantic? It would be
> > > interesting to analyse and especially look at the percentage of time
> > > the number of replicas in ISR is reduced to the configured
> > > min.insync.replicas.
> > >
> > > The comparison data would be interesting. I can have a test when
> > available.
> > > But this KIP will be deprioritized because there should be a
> > pre-requisite
> > > KIP for it.
> > >
> > > > A (perhaps naive) hypothesis would be that the
> > > new ack semantic indeed provides better produce latency, but at the
> > > cost of precipitating the slowest replica(s) out of the ISR?
> > >
> > > Yes, it could be.
> > >
> > > > 101. I understand the impact on produce latency, but I am not sure
> > > about the impact on durability. Is your durability model built against
> > > the replication factor or the number of min insync replicas?
> > >
> > > Yes, and also the new LEO-based leader election (not proposed yet).
> > >
> > > > 102. Could a new type of replica which would not be allowed to enter
> > > the ISR be an alternative? Such replica could attempt replication on a
> > > best-effort basis and would provide the permanent guarantee not to
> > > interfere with foreground traffic.
> > >
> > > You mean a backup replica, which will never become leader (in-sync),
> > right?
> > > That's an interesting thought and might be able to become a workaround
> > with
> > > the existing leader election. Let me think about it.
> > >
> > > Hi qiangLiu,
> > >
> > > > 

Re: [DISCUSS] KIP-926: introducing acks=min.insync.replicas config

2023-05-10 Thread Alexandre Dupriez
Hi, Luke,

Thanks for the KIP. It clearly highlights the tradeoff between latency
and durability and proposes an approach relaxing a durability
constraint to provide lower ingestion latency. Please find a few
comments/questions.

100. The KIP makes one statement which may be considered critical:
"Note that in acks=min.insync.replicas case, the slow follower might
be easier to become out of sync than acks=all.". Would you have some
data on that behaviour when using the new ack semantic? It would be
interesting to analyse and especially look at the percentage of time
the number of replicas in ISR is reduced to the configured
min.insync.replicas. A (perhaps naive) hypothesis would be that the
new ack semantic indeed provides better produce latency, but at the
cost of precipitating the slowest replica(s) out of the ISR?

The underlying reasoning is that if a follower replica (or set of
replicas) is (are) consistently slower to fetch than their peer(s) and
the increased batching the slow follower(s) may benefit from does not
offset for the extra fetch time, there is a risk that the leader LEO
may be harder to reach within the replica max lag time for this
(these) followers. This could also potentially lead to a higher rate
of ISR shrinkage and expansion similarly to a low replica max lag
time.

101. I understand the impact on produce latency, but I am not sure
about the impact on durability. Is your durability model built against
the replication factor or the number of min insync replicas?

102. Could a new type of replica which would not be allowed to enter
the ISR be an alternative? Such replica could attempt replication on a
best-effort basis and would provide the permanent guarantee not to
interfere with foreground traffic.

Thanks,
Alexandre



Le mer. 10 mai 2023 à 15:22, Divij Vaidya  a écrit :
>
> Thank you Luke for starting off this discussion. I have been thinking about
> this and other similar changes to the replication for a while now. The KIP
> that Ismael surfaced (where was that discussion thread hiding all this
> time!) addresses exactly the improvements that I have been wondering about.
>
> Let me state certain points here and tell me what you think about them.
>
> #1 We need to change the leader election if we introduce the new ack=min.isr
> I will expand on Ismael's comment about the necessity to change the leader
> election with an example.
> 1. {A, B, C} are in ISR, A is leader, min.insync.replicas=2
> 2. Write comes in with acks=min.insync.replicas and {A,B} receives the
> write and it gets acknowledged to the producer. C hasn't still received the
> write.
> 3. A fails. Leadership transfers to C.
> 4. C hasn't received the write in step 2 and hence, it will ask B to
> truncate itself and match the prefix of C.
>
> As you can observe, if we don't change the leader election strategy to
> choosing a leader with the largest LEO, we may end up in a situation where
> we are losing ACK'ed messages. This is a durability loss.
>
> #2 Now that we have established based on statement 1 above that it is
> necessary to modify the leader election, I believe we definitely should do
> it (and revive conversation from KIP-250). Determining the ISR with the
> largest LEO comes with a cost of multiple round trips with controllers.
> This is an acceptable cost because it improves the steady state scenario
> (lower latency for writes) while adding additional overhead of
> rare/exceptional scenarios (leadership failover).
> Another advantage of choosing the leader with the largest LEO is evident in
> case of an unclean leader election. We can extend this new leader election
> logic to choose the out-of-sync replica with the largest LEO in case of
> unclean leader election. This will reduce the amount of data loss in such a
> scenario. I have a draft for this here
> 
> but
> I never ended up writing a KIP for it.
>
> #3 Now, if we agree that we need to change the leader election to
> improve steady state, should we consider a raft-like quorum based algorithm
> instead of the current one? IMO, yes we should implement a quorum based
> algorithm, but not in the scope of this change. That is a bigger change and
> requires a different KIP which shouldn't block the immediate advantages of
> your proposal.
>
> #4 Changes to the replication protocol are tricky and full of edge case
> scenarios. How do we develop in the future and gain confidence about the
> changes? This is where formal models like TLA+ comes into the picture.
> Modeling Kafka's replication protocol in TLA+ helps us in demonstrating
> provable semantics AND it also helps in quick iteration of ideas. As an
> example, for your proposal, we can extend the (work in progress) model
> here:
> https://github.com/divijvaidya/kafka-specification/blob/master/Kip405.tla#L112
> and assert that the invariants hold true even after we make the change
> about ack 

Re: [DISCUSS] KIP-917: Additional custom metadata for remote log segment

2023-04-18 Thread Alexandre Dupriez
Hi Ivan,

Thanks for the follow-up.

Yes, you are right that the suggested alternative is to let the plugin
store its own metadata separately with a solution chosen by the admin
or plugin provider. For instance, it could be using a dedicated topic
if chosen to, or relying on an external key-value store.

I agree with you on the existing risks associated with running
third-party code inside Apache Kafka. That said, combining custom
metadata with rlmMetadata increases coupling between Kafka and the
plugin. For instance, the custom metadata may need to be modified
outside of Kafka, but the rlmMetadata would still be cached on brokers
independently of any update of custom metadata. Since both types of
metadata are authored by different systems, and are cached in
different layers, this may become a problem, or make plugin migration
more difficult. What do you think?

I have a vague memory of this being discussed back when the tiered
storage KIP was started. Maybe Satish has more background on this.

Thanks,
Alexandre

Le lun. 17 avr. 2023 à 16:50, Ivan Yurchenko
 a écrit :
>
> Hi Alexandre,
>
> Thank you for your feedback!
>
> > One question I would have is, what is the benefit of adding these
> > custom metadata in the rlmMetadata rather than letting the plugin
> > manage access and persistence to them?
>
> Could you please elaborate? Do I understand correctly that the idea is that
> the plugin will have its own storage for those custom metadata, for example
> a special topic?
>
> > It would be possible for a user
> > to use custom metadata large enough to adversely impact access to and
> > caching of the rlmMetadata by Kafka.
>
> Since the custom metadata is 100% under control of the RSM plugin, the risk
> is as big as the risk of running a third-party code (i.e. the RSM plugin).
> The cluster admin must make the decision if they trust it.
> To mitigate this risk and put it under control, the RSM plugin
> implementations could document what custom metadata they use and estimate
> their size.
>
> Best,
> Ivan
>
>
> On Mon, 17 Apr 2023 at 18:14, Alexandre Dupriez 
> wrote:
>
> > Hi Ivan,
> >
> > Thank you for the KIP.
> >
> > I think the KIP clearly explains the need for out-of-band metadata
> > authored and used by an implementation of the remote storage manager.
> > One question I would have is, what is the benefit of adding these
> > custom metadata in the rlmMetadata rather than letting the plugin
> > manage access and persistence to them?
> >
> > Maybe one disadvantage and potential risk with the approach proposed
> > in the KIP is that the rlmMetadata is not of a predefined, relatively
> > constant size (although corner cases with thousands of leader epochs
> > in the leader epoch map are possible). It would be possible for a user
> > to use custom metadata large enough to adversely impact access to and
> > caching of the rlmMetadata by Kafka.
> >
> > Thanks,
> > Alexandre
> >
> > Le jeu. 6 avr. 2023 à 16:03, hzh0425  a écrit :
> > >
> > > I think it's a good idea as we may want to store remote segments in
> > different buckets
> > >
> > >
> > >
> > > | |
> > > hzhka...@163.com
> > > |
> > > |
> > > 邮箱:hzhka...@163.com
> > > |
> > >
> > >
> > >
> > >
> > >  回复的原邮件 
> > > | 发件人 | Ivan Yurchenko |
> > > | 日期 | 2023年04月06日 22:37 |
> > > | 收件人 | dev@kafka.apache.org |
> > > | 抄送至 | |
> > > | 主题 | [DISCUSS] KIP-917: Additional custom metadata for remote log
> > segment |
> > > Hello!
> > >
> > > I would like to start the discussion thread on KIP-917: Additional custom
> > > metadata for remote log segment [1]
> > > This KIP is fairly small and proposes to add a new field to the remote
> > > segment metadata.
> > >
> > > Thank you!
> > >
> > > Best,
> > > Ivan
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-917%3A+Additional+custom+metadata+for+remote+log+segment
> >


Re: [DISCUSS] KIP-917: Additional custom metadata for remote log segment

2023-04-17 Thread Alexandre Dupriez
Hi Ivan,

Thank you for the KIP.

I think the KIP clearly explains the need for out-of-band metadata
authored and used by an implementation of the remote storage manager.
One question I would have is, what is the benefit of adding these
custom metadata in the rlmMetadata rather than letting the plugin
manage access and persistence to them?

Maybe one disadvantage and potential risk with the approach proposed
in the KIP is that the rlmMetadata is not of a predefined, relatively
constant size (although corner cases with thousands of leader epochs
in the leader epoch map are possible). It would be possible for a user
to use custom metadata large enough to adversely impact access to and
caching of the rlmMetadata by Kafka.

Thanks,
Alexandre

Le jeu. 6 avr. 2023 à 16:03, hzh0425  a écrit :
>
> I think it's a good idea as we may want to store remote segments in different 
> buckets
>
>
>
> | |
> hzhka...@163.com
> |
> |
> 邮箱:hzhka...@163.com
> |
>
>
>
>
>  回复的原邮件 
> | 发件人 | Ivan Yurchenko |
> | 日期 | 2023年04月06日 22:37 |
> | 收件人 | dev@kafka.apache.org |
> | 抄送至 | |
> | 主题 | [DISCUSS] KIP-917: Additional custom metadata for remote log segment |
> Hello!
>
> I would like to start the discussion thread on KIP-917: Additional custom
> metadata for remote log segment [1]
> This KIP is fairly small and proposes to add a new field to the remote
> segment metadata.
>
> Thank you!
>
> Best,
> Ivan
>
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-917%3A+Additional+custom+metadata+for+remote+log+segment


Re: [DISCUSS] KIP-895: Dynamically refresh partition count of __consumer_offsets

2023-04-12 Thread Alexandre Dupriez
Hi Divij,

Thanks for the follow-up. A few comments/questions.

100. The stated motivation to increase the number of partitions above
50 is scale. Are we sure that 50 partitions is not enough to cover all
valid use cases? An upper bound in the range 1 to 10 MB/s of ingress
per partition gives 50 to 500 MB/s. Assuming 100 bytes per offset and
metadata records, this gives between 500,000 and 5,000,000 offsets
committed per second. Assuming 10,000 consumers active on the cluster,
this would allow a rate of 50 to 500 offsets committed per second per
consumer. Are there really use cases where there is a genuine need for
more? Arguably, this does not include group metadata records which are
generated at a low frequency.

101. The partitioning scheme applied for consumer offsets is also used
in other parts such as the already mentioned transaction metadata or
remote log metadata for the topic-based remote log metadata manager
[1]. Have we considered a holistic approach for all these internal
topics?

Overall, I am not sure if changing the number of partitions for the
consumer offsets topic should even be allowed unless there is evidence
of it being required to accommodate throughput. Reassignment can be
required after cluster expansion, but that is correctly supported
IIRC.

Thanks,
Alexandre

[1] 
https://github.com/Hangleton/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java#L37

Le jeu. 6 avr. 2023 à 16:01, hzh0425  a écrit :
>
> I think it's a good idea as we may want to store segments in different buckets
>
>
>
> | |
> hzhka...@163.com
> |
> |
> 邮箱:hzhka...@163.com
> |
>
>
>
>
>  回复的原邮件 
> | 发件人 | Divij Vaidya |
> | 日期 | 2023年04月04日 23:56 |
> | 收件人 | dev@kafka.apache.org |
> | 抄送至 | |
> | 主题 | Re: [DISCUSS] KIP-895: Dynamically refresh partition count of 
> __consumer_offsets |
> FYI, a user faced this problem and reached out to us in the mailing list
> [1]. Implementation of this KIP could have reduced the downtime for these
> customers.
>
> Christo, would you like to create a JIRA and associate with the KIP so that
> we can continue to collect cases in the JIRA where users have faced this
> problem?
>
> [1] https://lists.apache.org/thread/zoowjshvdpkh5p0p7vqjd9fq8xvkr1nd
>
> --
> Divij Vaidya
>
>
>
> On Wed, Jan 18, 2023 at 9:52 AM Christo Lolov 
> wrote:
>
> > Greetings,
> >
> > I am bumping the below DISCUSSion thread for KIP-895. The KIP presents a
> > situation where consumer groups are in an undefined state until a rolling
> > restart of a cluster is performed. While I have demonstrated the behaviour
> > using a cluster using Zookeeper I believe the same problem can be shown in
> > a KRaft cluster. Please let me know your opinions on the problem and the
> > presented solution.
> >
> > Best,
> > Christo
> >
> > On Thursday, 29 December 2022 at 14:19:27 GMT, Christo
> > >  wrote:
> > >
> > >
> > > Hello!
> > > I would like to start this discussion thread on KIP-895: Dynamically
> > > refresh partition count of __consumer_offsets.
> > > The KIP proposes to alter brokers so that they refresh the partition
> > count
> > > of __consumer_offsets used to determine group coordinators without
> > > requiring a rolling restart of the cluster.
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-895%3A+Dynamically+refresh+partition+count+of+__consumer_offsets
> > >
> > > Let me know your thoughts on the matter!
> > > Best, Christo
> > >
> >


Re: [DISCUSS] KIP-895: Dynamically refresh partition count of __consumer_offsets

2023-04-04 Thread Alexandre Dupriez
Hi Christo,

Thanks for the KIP. Apologies for the delayed review.

At a high-level, I am not sure if the KIP really solves the problem it
intends to.

More specifically, the KIP mentions that once a broker is restarted
and the group coordinator becomes aware of the new partition count of
the consumer offsets topic, the problem is mitigated. However, how do
we access the metadata and offsets recorded in a partition once it is
no longer the partition a consumer group resolves to?

Thanks,
Alexandre

Le mar. 4 avr. 2023 à 18:34, Justine Olshan
 a écrit :
>
> Hi,
>
> I'm also a bit unsure of the motivation here. Is there a need to change the
> number of partitions for this topic?
>
> Justine
>
> On Tue, Apr 4, 2023 at 10:07 AM David Jacot  wrote:
>
> > Hi,
> >
> > I am not very comfortable with the proposal of this KIP. The main issue is
> > that changing the number of partitions means that all group metadata is
> > lost because the hashing changes. I wonder if we should just disallow
> > changing the number of partitions entirely. Did we consider something like
> > this?
> >
> > Best,
> > David
> >
> > Le mar. 4 avr. 2023 à 17:57, Divij Vaidya  a
> > écrit :
> >
> > > FYI, a user faced this problem and reached out to us in the mailing list
> > > [1]. Implementation of this KIP could have reduced the downtime for these
> > > customers.
> > >
> > > Christo, would you like to create a JIRA and associate with the KIP so
> > that
> > > we can continue to collect cases in the JIRA where users have faced this
> > > problem?
> > >
> > > [1] https://lists.apache.org/thread/zoowjshvdpkh5p0p7vqjd9fq8xvkr1nd
> > >
> > > --
> > > Divij Vaidya
> > >
> > >
> > >
> > > On Wed, Jan 18, 2023 at 9:52 AM Christo Lolov 
> > > wrote:
> > >
> > > > Greetings,
> > > >
> > > > I am bumping the below DISCUSSion thread for KIP-895. The KIP presents
> > a
> > > > situation where consumer groups are in an undefined state until a
> > rolling
> > > > restart of a cluster is performed. While I have demonstrated the
> > > behaviour
> > > > using a cluster using Zookeeper I believe the same problem can be shown
> > > in
> > > > a KRaft cluster. Please let me know your opinions on the problem and
> > the
> > > > presented solution.
> > > >
> > > > Best,
> > > > Christo
> > > >
> > > > On Thursday, 29 December 2022 at 14:19:27 GMT, Christo
> > > > >  wrote:
> > > > >
> > > > >
> > > > > Hello!
> > > > > I would like to start this discussion thread on KIP-895: Dynamically
> > > > > refresh partition count of __consumer_offsets.
> > > > > The KIP proposes to alter brokers so that they refresh the partition
> > > > count
> > > > > of __consumer_offsets used to determine group coordinators without
> > > > > requiring a rolling restart of the cluster.
> > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-895%3A+Dynamically+refresh+partition+count+of+__consumer_offsets
> > > > >
> > > > > Let me know your thoughts on the matter!
> > > > > Best, Christo
> > > > >
> > > >
> > >
> >


Re: [VOTE] KIP-915: Txn and Group Coordinator Downgrade Foundation

2023-03-30 Thread Alexandre Dupriez
Thanks for the KIP and clarifications, Jeff.

+1 (non binding)

Le jeu. 30 mars 2023 à 14:41, David Jacot
 a écrit :
>
> Thanks for the KIP, Jeff.
>
> +1 (binding)
>
> On Wed, Mar 29, 2023 at 4:48 PM Jeff Kim 
> wrote:
>
> > Hi all,
> >
> > I would like to start the vote on KIP-915: Txn and Group Coordinator
> > Downgrade Foundation.
> >
> > KIP:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-915%3A+Txn+and+Group+Coordinator+Downgrade+Foundation
> >
> > Discussion thread:
> > https://lists.apache.org/thread/myyl7phrnl6vsnzdskrntgzpvshoj4n3
> >
> > Best,
> > Jeff
> >


Re: [DISCUSS] KIP-915: Next Gen Group Coordinator Downgrade Path

2023-03-28 Thread Alexandre Dupriez
Hi Jeff,

Thank you for the fast answer!

100. Got it, I think I am starting to understand based on your example
exposing three Kafka versions. Please let me know if the following
corresponds to the correct interpretation.

Taking OffsetCommitValue as an example, the idea is to bump the schema
to version 4 to make the record flexible. The schema version 4 will be
back-ported to versions 3.0.3, 3.1.3, 3.2.4, 3.3.3, 3.4.1 and
potentially 3.5. These versions will continue to serialise the record
with version 3 so that downgrades from there to earlier minor and/or
major versions are possible. And, these versions will deserialize the
records with version 4 so that they can support flexible records,
although they do not make use of tagged fields (if any is present).

Then, in a future version, 3.x, x >= 6, a tag field will be added to
the record - say, topic id. Downgrade from 3.x to 3.0.3, 3.1.3, 3.2.4,
3.3.3, 3.4.1 and 3.5 will be possible because these versions will
perform deserialisation with the record schema version 4, that is,
supporting tagged fields.

Hence, this approach is both backward and forward looking and allows
to extend the scope of compatible versions for downgrades.

[N] 3.x, x >= 6 --> downgradable to [I] 3.0.3, 3.1.3, 3.2.4, 3.3.3,
3.4.1, 3.5 --> downgradable to e.g. [O] 3.0.0.

One note though is that a transitive downgrade from 3.x, x >= 6, to
3.0.0 via the version domain [I], will not be supported. Should it be
explicitly mentioned in the KIP that downgrades from 3.0.3, 3.1.3,
3.2.4, 3.3.3, 3.4.1, 3.5 to earlier versions may not be possible (if
offsets or group metadata records were generated by a higher version
3.x, x >= 6)? Or am I still misunderstanding?

101. I will try to provide an example to illustrate what I mean by
version. Consider the GroupMetadataValue schema version 4 and the
tagged field topicId introduced in 3.6. Let's say a new optional field
needs to be added in 3.7. We will have two records version 4, one with
topic id, the other with topic id and the new field. The new field is
semantically optional (structurally it is always optional since it is
a tagged field) but we want to make the distinction between a record
generated by 3.6 and one generated by 3.7. How do we resolve the
ambiguity?

Thanks!
Alexandre

Le mar. 28 mars 2023 à 16:13, Jeff Kim  a écrit :
>
> Hi Alexandre,
>
> Thank you for taking a look!
>
> 100. I am not sure I fully understand what you mean by forcefully adding
> tagged fields. Let's say VX does not have a flexible version,
> VY allows deserialization but serializes with a non-flexible version, and
> VZ introduces a new tagged field.
> VX upgrade to VY then downgrade back to VX works because even if group
> metadata changes VY will serialize with
> the highest non-flexible version. VZ to VY back to VZ also works because
> even if VY serializes with a non-flexible field
> VZ will be able to deserialize it as it is a supported version. Does this
> answer your question?
>
> 101. The future versioning scheme needs to be backward compatible with
> older coordinators. Wouldn't segregating into 2 versions
> be incompatible?
>
> Thanks,
> Jeff
>
> On Tue, Mar 28, 2023 at 5:47 AM Alexandre Dupriez <
> alexandre.dupr...@gmail.com> wrote:
>
> > Hi Jeff, Team,
> >
> > Thank you for the KIP. This is a very interesting approach. I feel it
> > is simpler than the described alternative although it comes with
> > tradeoffs, thanks for highlighting those. If I may, I would like to
> > share two naive questions.
> >
> > 100. The KIP mentions that records will be serialised with the highest
> > non-flexible version (e.g. 3 for GroupMetadataValue and
> > OffsetCommitValue) so that records can be deserialized with earlier
> > versions of Kafka. I am not sure I understand correctly: is the idea
> > to forcefully add tagged fields at the end of the records while
> > maintaining the existing version (3 for the two record types just
> > mentioned) so that they can be deserialized by existing Kafka versions
> > for which the version of these record types is not known as flexible,
> > while at the same time preserving the new tagged fields to new Kafka
> > versions abreast of the addition of a new flexible version for these
> > record types? If so, is it "bypassing" the protocol convention which
> > prescribes the use of a flexible version to allow the use of tagged
> > fields?
> >
> > 101. After the bump of the records to a new version indicated as
> > flexible, the record version is expected to never change while the
> > underlying tagged fields could potentially still evolve over time. One
> > potential downside is that we lose the benefits of the versioning
> > scheme enforced by the serde pr

Re: [DISCUSS] KIP-915: Next Gen Group Coordinator Downgrade Path

2023-03-28 Thread Alexandre Dupriez
Hi Jeff, Team,

Thank you for the KIP. This is a very interesting approach. I feel it
is simpler than the described alternative although it comes with
tradeoffs, thanks for highlighting those. If I may, I would like to
share two naive questions.

100. The KIP mentions that records will be serialised with the highest
non-flexible version (e.g. 3 for GroupMetadataValue and
OffsetCommitValue) so that records can be deserialized with earlier
versions of Kafka. I am not sure I understand correctly: is the idea
to forcefully add tagged fields at the end of the records while
maintaining the existing version (3 for the two record types just
mentioned) so that they can be deserialized by existing Kafka versions
for which the version of these record types is not known as flexible,
while at the same time preserving the new tagged fields to new Kafka
versions abreast of the addition of a new flexible version for these
record types? If so, is it "bypassing" the protocol convention which
prescribes the use of a flexible version to allow the use of tagged
fields?

101. After the bump of the records to a new version indicated as
flexible, the record version is expected to never change while the
underlying tagged fields could potentially still evolve over time. One
potential downside is that we lose the benefits of the versioning
scheme enforced by the serde protocol. Could this become a problem in
the future if there is ever a need to segregate two distinct
"versions" of the appended record structure held by the tagged fields?

Thanks,
Alexandre

Le jeu. 23 mars 2023 à 18:15, Jeff Kim  a écrit :
>
> Hi Yi,
>
> > Does it mean with a flexible version, the future
> version of these value types will stay at the version where the flexibility
> is first introduced? Will there be any need to bump the version again in
> the future?
>
> Yes, there will be no need to bump the version since we will only be adding
> tagged fields but in the chance the version is bumped, we will deserialize
> to the highest known (flexible) version which will ignore unknown tagged
> fields.
>
> > To enforce the version not bumping, is it possible to have a unit test to
> gate?
>
> Do you have some tests in mind? I don't think we can tell whether a version
> was bumped or not.
>
> Best,
> Jeff
>
> On Thu, Mar 23, 2023 at 12:07 PM Yi Ding  wrote:
>
> > Hi Jeff,
> >
> > Thanks for the update. Does it mean with a flexible version, the future
> > version of these value types will stay at the version where the flexibility
> > is first introduced? Will there be any need to bump the version again in
> > the future?
> > To enforce the version not bumping, is it possible to have a unit test to
> > gate?
> >
> >
> > On Wed, Mar 22, 2023 at 3:19 PM Jeff Kim 
> > wrote:
> >
> > > Hi all,
> > >
> > > After discussing with my colleagues, I have repurposed the KIP for a
> > > general downgrade solution for both transaction and group coordinators.
> > The
> > > KIP no longer discusses the downgrade path but instead lays out the
> > > foundation for future downgrade solutions.
> > >
> > > Link:
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-915%3A+Txn+and+Group+Coordinator+Downgrade+Foundation
> > >
> > > Thanks,
> > > Jeff
> > >
> > > On Mon, Mar 20, 2023 at 7:37 PM Jeff Kim  wrote:
> > >
> > > > Hi David and Justine,
> > > >
> > > > Thank you both for the detailed feedback.
> > > >
> > > > David,
> > > >
> > > > 1. That makes sense. I revised the "Reading new fields" section with
> > how
> > > > we can downgrade to the highest known version and that this was
> > confirmed
> > > > via unit testing. I also attempted to dive deeper into using tagged
> > > fields
> > > > and the rejected alternative. Please let me know what you think.
> > > >
> > > > 2. Under "Restrictions and Guidelines" I updated the paragraph to
> > clearly
> > > > state that we want to use tagged fields across all record types
> > > introduced
> > > > in KIP-848 including OffsetCommitValue.
> > > >
> > > > 3. Would it be possible to bump the OffsetCommitValue record version to
> > > > make it flexible along with the changes to parse with the highest known
> > > > version? I'm not sure I understand why we cannot make both changes
> > > together.
> > > >
> > > > 4. I completely missed this. Added some notes at the end of
> > "Restrictions
> > > > and Guidelines". Unfortunately I can't think of a solution at the
> > moment.
> > > > Will get back to you.
> > > >
> > > > 5. I have a section under "Identifying New Record Types" that discusses
> > > > this:
> > > >  > We can automate the cleanup by writing tombstones when the
> > coordinator
> > > > reads unrecognizable records. This may add duplicate work if tombstones
> > > > were already added but not yet pruned by the log cleaner.
> > > > This is a sure way to delete any unknown record types even if the
> > > operator
> > > > does not follow the steps.
> > > >
> > > > 6. Thanks, I have expanded on the section on transactional 

[jira] [Created] (KAFKA-14852) Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-03-27 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-14852:
-

 Summary: Propagate Topic Ids to the Group Coordinator for Offset 
Fetch
 Key: KAFKA-14852
 URL: https://issues.apache.org/jira/browse/KAFKA-14852
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alexandre Dupriez
 Fix For: 3.5.0


This task is the sibling of KAFKA-14793 which propagates topic ids in the group 
coordinator on the offset commit (write) path. The purpose of this JIRA is to 
change the interfaces of the group coordinator and group coordinator adapter to 
propagate topic ids in a similar way.

KAFKA-14691 will add the topic ids to the OffsetFetch API itself so that topic 
ids are propagated from clients to the coordinator on the offset fetch path. 

Changes to the persisted data model (group metadata and keys) are out of scope.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14845) Broker ZNode creation can fail due to a session ID unknown to the broker

2023-03-24 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-14845:
-

 Summary: Broker ZNode creation can fail due to a session ID 
unknown to the broker
 Key: KAFKA-14845
 URL: https://issues.apache.org/jira/browse/KAFKA-14845
 Project: Kafka
  Issue Type: Bug
Reporter: Alexandre Dupriez
 Attachments: broker-registration.drawio

Our production environment faced a use case where registration of a broker 
failed due to the presence of a "conflicting" broker znode in Zookeeper. This 
case is not without familiarity to that fixed by KAFKA-6584 and induced by the 
Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.

A network partition disturbed communication channels between the Kafka and 
Zookeeper clusters for about 20% of the brokers in the cluster. One of this 
broker was not able to re-register with Zookeeper and was excluded from the 
cluster until it was restarted. Broker logs show the failed registration due to 
a "conflicting" znode write which in this case is not covered by KAFKA-6584. 
The broker did not restart and was not unhealthy. In the following logs, the 
broker IP is 1.2.3.4.

The sequence of logs on the broker is as follows.

First, a connection is established with the Zookeeper node 3.

 
{code:java}
[2023-03-05 16:01:55,342] INFO Socket connection established, initiating 
session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, 
L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
 

An existing Zookeeper session was expired, and upon reconnection, the Zookeeper 
state change handler was invoked. The creation of the ephemeral znode 
/brokers/ids/18 started on the controller thread.

 
{code:java}
[2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) 
(kafka.zk.KafkaZkClient){code}
 

The client "session" timed out after 6 seconds. Note the session is 0x0 and the 
absence of "{_}Session establishment complete{_}" log: the broker appears to 
have never received or processed the response from the Zookeeper node.

 
{code:java}
[2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from 
server in 6000ms for sessionid 0x0, closing socket connection and attempting 
reconnect (org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, 
L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
 

Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client 
started waiting on a new connection notification.

 
{code:java}
[2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient){code}
 

A new connection was created with the Zookeeper node 1. Note that a valid (new) 
session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.

 
{code:java}
[2023-03-05 16:02:02,037] INFO Socket connection established, initiating 
session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, 
L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-05 16:02:03,054] INFO Session establishment complete on server 
zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 18000 
(org.apache.zookeeper.ClientCnxn){code}
 

The Kafka ZK client is notified of the connection.

 
{code:java}
[2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient){code}
 

The broker sends the request to create the znode {{/brokers/ids/18}} which 
already exists. The error path implemented for KAFKA-6584 is then followed. 
However, in this case, the session owning the ephemeral node 
{{0x30043230ac1}} ({{{}216172783240153793{}}}) is different from the last 
active Zookeeper session which the broker has recorded. And it is also 
different from the current session {{0x1006c6e0b830001}} 
({{{}72176813933264897{}}}), hence the recreation of the broker znode is not 
attempted.

 
{code:java}
[2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at 
/brokers/ids/18, node already exists and owner '216172783240153793' does not 
match current session '72176813933264897' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
NodeExists
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
        at 
kafka.zk.KafkaZkClient.checked

Re: [VOTE] KIP-906 Tools migration guidelines

2023-03-15 Thread Alexandre Dupriez
Hi, Frederico,

Thanks for the KIP.

Non-binding +1.

Thanks,
Alexandre

Le mer. 15 mars 2023 à 08:28, Luke Chen  a écrit :
>
> +1 from me.
>
> Luke
>
> On Wed, Mar 15, 2023 at 4:11 PM Federico Valeri 
> wrote:
>
> > Hi everyone,
> >
> > I'd like to start the vote on KIP-906 Tools migration guidelines.
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
> >
> > Discussion thread:
> > https://lists.apache.org/thread/o2ytmjj2tyc2xcy6xh5tco31yyjzvz8p
> >
> > Thanks
> > Fede
> >


[jira] [Created] (KAFKA-14806) Add connection timeout in PlaintextSender used by SelectorTests

2023-03-14 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-14806:
-

 Summary: Add connection timeout in PlaintextSender used by 
SelectorTests
 Key: KAFKA-14806
 URL: https://issues.apache.org/jira/browse/KAFKA-14806
 Project: Kafka
  Issue Type: Test
Reporter: Alexandre Dupriez


Tests in `SelectorTest` can fail due to spurious connection timeouts. One 
example can be found in [this 
build|https://github.com/apache/kafka/pull/13378/checks?check_run_id=11970595528]
 where the client connection the `PlaintextSender` tried to open could not be 
established before the test timed out.

It may be worth enforcing connection timeout and retries if this can add to the 
selector tests resiliency. Note that `PlaintextSender` is only used by the 
`SelectorTest` so the scope of the change would remain local.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14793) Propagate topic ids to the group coordinator

2023-03-08 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-14793:
-

 Summary: Propagate topic ids to the group coordinator
 Key: KAFKA-14793
 URL: https://issues.apache.org/jira/browse/KAFKA-14793
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alexandre Dupriez


KAFKA-14690 introduces topic ids in the OffsetCommit API in the request layer. 
Propagation of topic ids within the group coordinator has been left out of 
scope. Whether topic ids are re-mapped internally in the group coordinator or 
the group coordinator starts to rely on {{{}TopicIdPartition{}}}.

Note that with KAFKA-14690, the offset commit response data built by the 
coordinator includes topic names only, and topic ids need to be injected 
afterwards outside of the coordinator before serializing the response.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-641 An new java interface to replace 'kafka.common.MessageReader'

2023-03-07 Thread Alexandre Dupriez
+1 (non-binding).

Thanks for driving this.

Le mar. 7 mars 2023 à 08:47, Federico Valeri  a écrit :
>
> + 1 (non binding) for the latest revision of this KIP (RecordReader).
>
> Thanks
> Fede
>
> On Sat, Feb 25, 2023 at 7:04 AM Luke Chen  wrote:
> >
> > +1 from me.
> >
> > Thank you
> > Luke
> >
> > On Sat, Feb 25, 2023 at 8:39 AM Chia-Ping Tsai  wrote:
> >
> > > Dear all,
> > >
> > > All comments are addressed , and so please take look at this KIP. It needs
> > > vote and love :)
> > >
> > > thanks.


[jira] [Created] (KAFKA-14780) Make RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay deterministic

2023-03-06 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-14780:
-

 Summary: Make 
RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay deterministic
 Key: KAFKA-14780
 URL: https://issues.apache.org/jira/browse/KAFKA-14780
 Project: Kafka
  Issue Type: Test
Reporter: Alexandre Dupriez


The test `RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay` relies 
on the actual system clock which makes it frequently fail on my poor intellij 
setup.

 

The `RefreshingHttpsJwks` component creates and uses a scheduled executor 
service. We could expose the scheduling mechanism to be able to mock its 
behaviour. One way to do could be to use the `KafkaScheduler` which has a 
`MockScheduler` implementation which relies on `MockTime` instead of the real 
time clock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14779) Add ACL Authorizer integration test for authorized OffsetCommits with an unknown topic

2023-03-06 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-14779:
-

 Summary: Add ACL Authorizer integration test for authorized 
OffsetCommits with an unknown topic
 Key: KAFKA-14779
 URL: https://issues.apache.org/jira/browse/KAFKA-14779
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alexandre Dupriez


Discovered as part of [PR-13240|https://github.com/apache/kafka/pull/13240),], 
it seems the use case where a group and topic have the necessary ACLs to allow 
for offsets for that topic and consumer group to be committed, but the topic is 
unknown by the broker (either by name or id), is not covered. This purpose of 
this ticket is to add this coverage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14777) Add support of topic id for OffsetCommitRequests in CommitRequestManager

2023-03-05 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-14777:
-

 Summary: Add support of topic id for OffsetCommitRequests in 
CommitRequestManager
 Key: KAFKA-14777
 URL: https://issues.apache.org/jira/browse/KAFKA-14777
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alexandre Dupriez


Topic IDs have been introduced to the {{OffsetCommitRequest}} in KAFKA-14690. 
The consumer coordinator now generates these requests with topic ids when all 
topics present in the request have a resolved id.

This change was not added to the commit request manager to limit the scope of 
[PR-13240|https://github.com/apache/kafka/pull/13240]. The purpose of this PR 
is to extend the support of topic ids to this new component.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-641 An new java interface to replace 'kafka.common.MessageReader'

2023-02-22 Thread Alexandre Dupriez
Hi Chia-Ping,

Thanks for your answer. Apologies I should have been clearer in the
previous message. What I meant is, is there a plan to use the SPI more
broadly inside the Kafka codebase?

The question arises because the interface exposes a close() method
which is never invoked by the ConsoleProducer. Hence, although we need
to keep this method to maintain compatibility of the SPI with its
current implementations, we should perhaps clarify that this method is
not used/deprecated, unless it is intended to be used in the future.

Thanks,
Alexandre

Le mer. 22 févr. 2023 à 09:27, Chia-Ping Tsai  a écrit :
>
> > A minor comment. The SPI is currently used exclusively for the
> > ConsoleProducer. However, it exposes high-level methods which hint at
> > it being a generic component. What is the actual scope of the SPI
> > inside the Kafka codebase? Is it planned to be re-used in other tools?
> > Or is this interface used (not implemented) outside of the
> > ConsoleProducer?
>
> It is used by ConsoleProducer only. The interface is a kind of public APIs, 
> and IIRC the public APIs must be written by Java. That is why we need to move 
> it out of core module (simplify core module also).
>
> There are many other interfaces which allow users to "enhance" kafka. For 
> example, partitioner, assignor, authorizer, and so on. Most of them are used 
> exclusively for specify component, and I guess not all interfaces are widely 
> used (implemented). Maybe we can file a thread to cleanup the "unused" 
> interfaces.


Re: [DISCUSS] KIP-641 An new java interface to replace 'kafka.common.MessageReader'

2023-02-20 Thread Alexandre Dupriez
Hi Chia-Ping,

Thank you for the KIP and apologies for missing it earlier.

A minor comment. The SPI is currently used exclusively for the
ConsoleProducer. However, it exposes high-level methods which hint at
it being a generic component. What is the actual scope of the SPI
inside the Kafka codebase? Is it planned to be re-used in other tools?
Or is this interface used (not implemented) outside of the
ConsoleProducer?

Thanks,
Alexandre

Le sam. 18 févr. 2023 à 19:02, Chia-Ping Tsai  a écrit :
>
>
>
> On 2023/02/18 08:44:05 Tom Bentley wrote:
> > Hi Chia-Ping,
> >
> > To be honest the stateful version, setting an input stream once using the
> > `readFrom(InputStream)` method and then repeatedly asking for the next
> > record using a parameterless `readRecord()`, seems a bit more natural to me
> > than `readRecord(InputStream inputStream)` being called repeatedly with (I
> > assume) the same input stream. I think the contract is simpler to describe
> > and understand.
>
> I prefer readRecord() also. It is a trade-off between having `Configurable` 
> interface and having a parameterless readRecord(). If the `Configurable` is 
> not required, I'd like to revert to readRecord(). WDYT?
>
> >
> > It's worth thinking about how implementers might have to read bytes from
> > the stream to discover the end of one record and the start of the next.
> > Unless we've guaranteed that the input stream supports mark and reset then
> > they have to buffer the initial bytes of the next record that they've just
> > read from the stream so that they can use them when called next time. So I
> > think RecordReaders are (in general) inherently stateful and therefore it
> > seems harmless for them to also have the input stream itself as some of
> > that state.
>
> you are right. As the input stream is keyboard input, it would be hard to 
> expect the number of bytes for one record.
>


[jira] [Resolved] (KAFKA-8752) Ensure plugin classes are instantiable when discovering plugins

2023-02-20 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-8752.
--
Resolution: Not A Problem

> Ensure plugin classes are instantiable when discovering plugins
> ---
>
> Key: KAFKA-8752
> URL: https://issues.apache.org/jira/browse/KAFKA-8752
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>    Reporter: Alexandre Dupriez
>        Assignee: Alexandre Dupriez
>Priority: Minor
> Attachments: stacktrace.log
>
>
> While running integration tests from the IntelliJ IDE, it appears plugins 
> fail to load in {{DelegatingClassLoader.scanUrlsAndAddPlugins}}. The reason 
> was, in this case, that the class 
> {{org.apache.kafka.connect.connector.ConnectorReconfigurationTest$TestConnector}}
>  could not be instantiated - which it does not intend to be.
> The problem does not appear when running integration tests with Gradle as the 
> runtime closure is different from IntelliJ - which includes test sources from 
> module dependencies on the classpath.
> While debugging this minor inconvenience, I could see that 
> {{DelegatingClassLoader}} performs a sanity check on the plugin class to 
> instantiate - as of now, it verifies the class is concrete. A quick fix for 
> the problem highlighted above could to add an extra condition on the Java 
> modifiers of the class to ensure it will be instantiable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-903: Replicas with stale broker epoch should not be allowed to join the ISR

2023-02-10 Thread Alexandre Dupriez
+1 (non-binding).

Le ven. 10 févr. 2023 à 19:02, Calvin Liu  a écrit :
>
> Hi all,
>
> I'd like to call for a vote on KIP-903, which proposes a fix to the broker
> reboot data loss KAFKA-14139
> 
> It changes the Fetch and AlterPartition requests to include the broker
> epochs. Then the controller can use these epochs to help reject the stale
> AlterPartition request.
>
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR
>
> Discussion thread:
> https://lists.apache.org/thread/vk9h68qpqqq69nlzbvxqx2yfbmzcd0mo


Re: [DISCUSS] KIP-903: Replicas with stale broker epoch should not be allowed to join the ISR

2023-02-09 Thread Alexandre Dupriez
 05. In FetchRequest/FetchResponse, you need to update the `validVersions`
> > as well.
> >
> > 06. It is a little weird to have ReplicaId and BrokerEpoch in the
> > FetchRequest. I wonder if we should rename ReplicaId to BrokerId because it
> > is actually the broker id (even the documentation of the field says it).
> >
> > 07. On the followers, the fetch request version is derived from the
> > metadata version/IBP. As we add a new fetch version, we need to add a new
> > metadata version/IBP as well.
> >
> > 08. Regarding KRaft vs ZK, I don't have a strong opinion. ZK is on the way
> > out so not doing it seems fine. If we do this, we could basically just
> > ignore the broker epoch in ZK and it will keep working as today.
> >
> > Best,
> > David
> >
> > On Wed, Feb 8, 2023 at 3:01 PM Alexandre Dupriez <
> > alexandre.dupr...@gmail.com> wrote:
> >
> > > Hi, Calvin,
> > >
> > > Thanks for the KIP and fast follow-up. A few questions.
> > >
> > > 100. The scenario illustrated in the KIP involves a controller
> > > movement. Is this really required? Cannot the scenario occur with a
> > > similar stale AlterPartition request and the same controller
> > > throughout?
> > >
> > > 101. In the case where card(ISR) = 1 and the last replica leaves, it
> > > will be re-elected as the leader upon reconnection. If the replica is
> > > empty, all data for the partition will be lost. Is this a correct
> > > understanding of the scenario?
> > >
> > > 102. I understand that ZK is going to be unsupported soon. However for
> > > as long as it is available to end users, is there any reason not to
> > > support the fix in ZK mode? Arguably, the implementation for the logic
> > > to AlterPartition is duplicated for both controller types, and it may
> > > be more work than is worth if ZK is fully decommissioned in the next
> > > few months. (Alternatively, is there a plan to back port the fix to
> > > older minor versions?).
> > >
> > > 103. The KIP mentions system tests to be used to simulate the race
> > > condition. Would it be possible to provide more details about it? Do
> > > we think it worth having this scenario be exercised in the functional
> > > tests of the core/server module?
> > >
> > > Thanks,
> > > Alexandre
> > >
> > > Le mer. 8 févr. 2023 à 03:31, Artem Livshits
> > >  a écrit :
> > > >
> > > > Hi Calvin,
> > > >
> > > > Thank you for the KIP.  I have a similar question -- we need to support
> > > > rolling upgrades (when we have some old brokers and some new brokers),
> > so
> > > > there could be combinations of old leader - new follower, new leader -
> > > old
> > > > follower, new leader - old controller, old leader - new controller.
> > > Could
> > > > you elaborate on the behavior during rolls in the Compatibility
> > section?
> > > >
> > > > Also for compatibility it's probably going to be easier to just add a
> > new
> > > > array of epochs in addition to the existing array of broker ids,
> > instead
> > > of
> > > > removing one field and adding another.
> > > >
> > > > The KIP mentions that we would explicitly do something special in ZK
> > mode
> > > > in order to not implement new functionality.  I think it may be easier
> > to
> > > > implement functionality for both ZK and KRraft mode than adding code to
> > > > disable it in ZK mode.
> > > >
> > > > -Artem
> > > >
> > > > On Tue, Feb 7, 2023 at 4:58 PM Jun Rao 
> > wrote:
> > > >
> > > > > Hi, Calvin,
> > > > >
> > > > > Thanks for the KIP. Looks good to me overall.
> > > > >
> > > > > Since this KIP changes the inter-broker protocol, should we bump up
> > the
> > > > > metadata version (the equivalent of IBP) during upgrade?
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Fri, Feb 3, 2023 at 10:55 AM Calvin Liu
> >  > > >
> > > > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > > I'd like to discuss the fix for the broker reboot data loss
> > > KAFKA-14139
> > > > > > <https://issues.apache.org/jira/browse/KAFKA-14139>.
> > > > > > It changes the Fetch and AlterPartition requests to include the
> > > broker
> > > > > > epochs. Then the controller can use these epochs to help reject the
> > > stale
> > > > > > AlterPartition request.
> > > > > > Please take a look. Thanks!
> > > > > >
> > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR
> > > > > >
> > > > >
> > >
> >


Re: [DISCUSS] KIP-903: Replicas with stale broker epoch should not be allowed to join the ISR

2023-02-08 Thread Alexandre Dupriez
Hi, Calvin,

Thanks for the KIP and fast follow-up. A few questions.

100. The scenario illustrated in the KIP involves a controller
movement. Is this really required? Cannot the scenario occur with a
similar stale AlterPartition request and the same controller
throughout?

101. In the case where card(ISR) = 1 and the last replica leaves, it
will be re-elected as the leader upon reconnection. If the replica is
empty, all data for the partition will be lost. Is this a correct
understanding of the scenario?

102. I understand that ZK is going to be unsupported soon. However for
as long as it is available to end users, is there any reason not to
support the fix in ZK mode? Arguably, the implementation for the logic
to AlterPartition is duplicated for both controller types, and it may
be more work than is worth if ZK is fully decommissioned in the next
few months. (Alternatively, is there a plan to back port the fix to
older minor versions?).

103. The KIP mentions system tests to be used to simulate the race
condition. Would it be possible to provide more details about it? Do
we think it worth having this scenario be exercised in the functional
tests of the core/server module?

Thanks,
Alexandre

Le mer. 8 févr. 2023 à 03:31, Artem Livshits
 a écrit :
>
> Hi Calvin,
>
> Thank you for the KIP.  I have a similar question -- we need to support
> rolling upgrades (when we have some old brokers and some new brokers), so
> there could be combinations of old leader - new follower, new leader - old
> follower, new leader - old controller, old leader - new controller.  Could
> you elaborate on the behavior during rolls in the Compatibility section?
>
> Also for compatibility it's probably going to be easier to just add a new
> array of epochs in addition to the existing array of broker ids, instead of
> removing one field and adding another.
>
> The KIP mentions that we would explicitly do something special in ZK mode
> in order to not implement new functionality.  I think it may be easier to
> implement functionality for both ZK and KRraft mode than adding code to
> disable it in ZK mode.
>
> -Artem
>
> On Tue, Feb 7, 2023 at 4:58 PM Jun Rao  wrote:
>
> > Hi, Calvin,
> >
> > Thanks for the KIP. Looks good to me overall.
> >
> > Since this KIP changes the inter-broker protocol, should we bump up the
> > metadata version (the equivalent of IBP) during upgrade?
> >
> > Jun
> >
> >
> > On Fri, Feb 3, 2023 at 10:55 AM Calvin Liu 
> > wrote:
> >
> > > Hi everyone,
> > > I'd like to discuss the fix for the broker reboot data loss KAFKA-14139
> > > .
> > > It changes the Fetch and AlterPartition requests to include the broker
> > > epochs. Then the controller can use these epochs to help reject the stale
> > > AlterPartition request.
> > > Please take a look. Thanks!
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR
> > >
> >


Re: [DISCUSS] Tiered-Storage: Implement RocksdbBasedMetadataCache for TopicBased RLMM

2023-01-26 Thread Alexandre Dupriez
Hi, hzh0425,

Thank you for raising the question. There have been discussions about
using RocksDB as a RLMM in the community, one of which can be found on
the dev list [1].
What is the size of the metadata set in your use case? Have you
considered using an external data store to abstract metadata
persistence away from the local storage?

[1] https://lists.apache.org/thread/8lcslnwrnj1s7mk2c3g3fw0zqjwrogds

Thanks,
Alexandre

Le sam. 21 janv. 2023 à 05:35, hzh0425  a écrit :
>
> Background:
> In KIP-405: Kafka Tiered Storage - Apache Kafka - Apache Software Foundation, 
>  kafka introduced the feature of hierarchical storage, and RLMM is 
> responsible for storing remote segment's metadata.
>
>
> BTW, [KAFKA-9555] Topic-based implementation for the RemoteLogMetadataManager 
> - ASF JIRA (apache.org) implements the default RLMM - 'TopicBased-RLMM'.
>
>
>
>
> Problem:
> TopicBased RLMM stores all metadata of subscriptions in memory.
>
> In our practice, we found that when the metadata gradually increases, there 
> will be a huge burden on the broker's memory (GB level), and at the same 
> time, it will be very time-consuming to save the snapshot of the full amount 
> of metadata to the disk.
>
>
>
>
> Solution
>
> We hope to introduce rocksdb to solve this problem:
> - Implement a RocksdbBasedMetadataCache
> - All metadata is stored on disk, only a small amount of rocksdb memory cache 
> is required.
> - There is no need to bear the time consumption caused by saving the full 
> amount of snapshot metadata to disk, rocksdb can guarantee incremental 
> storage.
>
>
> You are welcome to discuss this Improvement by replying email !
>
>
> Thanks,
> Hzh0425
>
>
> | |
> hzhkafka
> |
> |
> hzhka...@163.com
> |


Re: [DISCUSS] KIP-852 Optimize calculation of size for log in remote tier

2022-11-18 Thread Alexandre Dupriez
Hi Divij,

Thanks for the KIP. Please find some comments based on what I read on
this thread so far - apologies for the repeats and the late reply.

If I understand correctly, one of the main elements of discussion is
about caching in Kafka versus delegation of providing the remote size
of a topic-partition to the plugin.

A few comments:

100. The size of the “derived metadata” which is managed by the plugin
to represent an rlmMetadata can indeed be close to 1 kB on average
depending on its own internal structure, e.g. the redundancy it
enforces (unfortunately resulting to duplication), additional
information such as checksums and primary and secondary indexable
keys. But indeed, the rlmMetadata is itself a lighter data structure
by a factor of 10. And indeed, instead of caching the “derived
metadata”, only the rlmMetadata could be, which should address the
concern regarding the memory occupancy of the cache.

101. I am not sure I fully understand why we would need to cache the
list of rlmMetadata to retain the remote size of a topic-partition.
Since the leader of a topic-partition is, in non-degenerated cases,
the only actor which can mutate the remote part of the
topic-partition, hence its size, it could in theory only cache the
size of the remote log once it has calculated it? In which case there
would not be any problem regarding the size of the caching strategy.
Did I miss something there?

102. There may be a few challenges to consider with caching:

102.1) As mentioned above, the caching strategy assumes no mutation
outside the lifetime of a leader. While this is true in the normal
course of operation, there could be accidental mutation outside of the
leader and a loss of consistency between the cached state and the
actual remote representation of the log. E.g. split-brain scenarios,
bugs in the plugins, bugs in external systems with mutating access on
the derived metadata. In the worst case, a drift between the cached
size and the actual size could lead to over-deleting remote data which
is a durability risk.

The alternative you propose, by making the plugin the source of truth
w.r.t. to the size of the remote log, can make it easier to avoid
inconsistencies between plugin-managed metadata and the remote log
from the perspective of Kafka. On the other hand, plugin vendors would
have to implement it with the expected efficiency to have it yield
benefits.

102.2) As you mentioned, the caching strategy in Kafka would still
require one iteration over the list of rlmMetadata when the leadership
of a topic-partition is assigned to a broker, while the plugin can
offer alternative constant-time approaches. This calculation cannot be
put on the LeaderAndIsr path and would be performed in the background.
In case of bulk leadership migration, listing the rlmMetadata could a)
result in request bursts to any backend system the plugin may use
[which shouldn’t be a problem for high-throughput data stores but
could have cost implications] b) increase utilisation timespan of the
RLM threads for these calculations potentially leading to transient
starvation of tasks queued for, typically, offloading operations c)
could have a non-marginal CPU footprint on hardware with strict
resource constraints. All these elements could have an impact to some
degree depending on the operational environment.

>From a design perspective, one question is where we want the source of
truth w.r.t. remote log size to be during the lifetime of a leader.
The responsibility of maintaining a consistent representation of the
remote log is shared by Kafka and the plugin. Which system is best
placed to maintain such a state while providing the highest
consistency guarantees is something both Kafka and plugin designers
could help understand better.

Many thanks,
Alexandre


Le jeu. 17 nov. 2022 à 19:27, Jun Rao  a écrit :
>
> Hi, Divij,
>
> Thanks for the reply.
>
> Point #1. Is the average remote segment metadata really 1KB? What's listed
> in the public interface is probably well below 100 bytes.
>
> Point #2. I guess you are assuming that each broker only caches the remote
> segment metadata in memory. An alternative approach is to cache them in
> both memory and local disk. That way, on broker restart, you just need to
> fetch the new remote segments' metadata using the
> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
> api. Will that work?
>
> Point #3. Thanks for the explanation and it sounds good.
>
> Thanks,
>
> Jun
>
> On Thu, Nov 17, 2022 at 7:31 AM Divij Vaidya 
> wrote:
>
> > Hi Jun
> >
> > There are three points that I would like to present here:
> >
> > 1. We would require a large cache size to efficiently cache all segment
> > metadata.
> > 2. Linear scan of all metadata at broker startup to populate the cache will
> > be slow and will impact the archival process.
> > 3. There is no other use case where a full scan of segment metadata is
> > required.
> >
> > Let's start by quantifying 1. Here's my 

[jira] [Created] (KAFKA-14190) Corruption of Topic IDs with pre-2.8.0 admin clients

2022-08-30 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-14190:
-

 Summary: Corruption of Topic IDs with pre-2.8.0 admin clients
 Key: KAFKA-14190
 URL: https://issues.apache.org/jira/browse/KAFKA-14190
 Project: Kafka
  Issue Type: Bug
  Components: admin, core, zkclient
Affects Versions: 3.2.1, 3.1.1, 3.2.0, 3.0.1, 3.0.0, 2.8.1, 3.1.0
Reporter: Alexandre Dupriez


h4. Scope

The problem reported below has been verified to occur with Zookeeper 
controllers. It has not been attempted with Kraft controllers, although it is 
unlikely to be reproduced in Kraft mode given the nature of the issue and 
clients involved.
h4. Problem Description

There is a loss of topic IDs when an AdminClient of version < 2.8.0 is used to 
increase the number of partitions of a topic for a cluster with version >= 
2.8.0. This results in the controller re-creating topic IDs upon restart, 
eventually conflicting with the topic ID of broker’s partition.metadata files 
in the partition directories of the impacted topic, leading to an availability 
loss of the partitions which do not accept leadership / follower-ship when the 
topic ID indicated by a LeaderAndIsr request differ from their own locally 
cached ID.

One mitigation post-corruption is to substitute the stale topic ID in the 
partition.metadata files with the new topic ID referenced by the controller, or 
alternatively, delete the partition.metadata file altogether. 
h4. Steps to reproduce

1. Set-up and launch a two-nodes Kafka cluster in Zookeeper mode.

2. Create a topic e.g. via {{kafka-topics.sh}}

 
{noformat}
./bin/kafka-topics.sh --bootstrap-server :9092 --create --topic myTopic 
--partitions 2 --replication-factor 2{noformat}
3. Capture the topic ID using a 2.8.0+ client.

 
{noformat}
./kafka/bin/kafka-topics.sh --bootstrap-server :9092 --topic myTopic --describe

Topic: myTopic TopicId: jKTRaM_TSNqocJeQI2aYOQ PartitionCount: 2 
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: myTopic Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1{noformat}
 

4. Restart one of the broker. This will make each broker create the 
{{partition.metadata}} files in the partition directories since it will already 
have loaded the {{Log}} instance in memory.

 

5. Using a pre-2.8.0 client library, run the following command.

 
{noformat}
./kafka/bin/kafka-topics.sh --zookeeper :2181 --alter --topic myTopic 
--partitions 3{noformat}
 

6. Using a 2.8.0+ client library, describe the topic via Zookeeper and notice 
the absence of topic ID from the output, where it is otherwise expected.

 
{noformat}
./kafka/bin/kafka-topics.sh —zookeeper :2181 —describe —topic myTopic

Topic: myTopic PartitionCount: 3 ReplicationFactor: 2 Configs: 
Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat}
 

7. Using a 2.8.0+ client library, describe the topic via a broker endpoint and 
notice the topic ID changed.

 
{noformat}
./kafka/bin/kafka-topics.sh —bootstrap-server :9093 —describe —topic myTopic

Topic: myTopic TopicId: nI-JQtPwQwGiylMfm8k13w PartitionCount: 3 
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: myTopic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat}
 

8. Restart the controller.

9. Check the state-change.log file on the controller broker. The following type 
of logs will appear.

 
{noformat}
[2022-08-25 17:44:05,308] ERROR [Broker id=0] Topic Id in memory: 
jKTRaM_TSNqocJeQI2aYOQ does not match the topic Id for partition myTopic-0 
provided in the request: nI-JQtPwQwGiylMfm8k13w. (state.change.logger){noformat}
 

10. Restart the other broker.

11. Describe the topic via the broker endpoint or Zookeeper with a 2.8.0+ 
client library

 
{noformat}
./kafka/bin/kafka-topics.sh --zookeeper :2181 --describe --topic myTopic

Topic: myTopic TopicId: nI-JQtPwQwGiylMfm8k13w PartitionCount: 3 
ReplicationFactor: 2 Configs: 
Topic: myTopic Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0
Topic: myTopic Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0{noformat}
 

Notice the abnormal state the topic is in: ISR is reduced to one single broker 
which is claimed to be the leader by the controller (here, broker 0). The 
controller believes 0 is the leader because it does not handle the error 
response from peer brokers when sending the requests for them to become a 
leader or follower of a partition.

12. Verify produce is unavailable.

 
{noformat}
./kafka/bin/kafka-console-producer.sh —bootstrap-server:9092 —topic myTopic

[2022-08-25 17:52:59,962] ERROR Error when sending message to topic m

Re: Recover partitions from a failed broker leader which is the only replica in isr.

2021-05-16 Thread Alexandre Dupriez
Hi JD,

You can enable unclean leader election (disabled by default) to use
ex-ISR followers as fallbacks for new leaders - but be mindful of data
loss.

Thanks,
Alexandre

Le mar. 10 nov. 2020 à 06:33, JD Zheng  a écrit :
>
> Hi, everyone,
>
> Not sure if this is the right place to ask this question. I sent it to the
> user mailing list and didn't get any response. Where should I ask such
> questions?
> We configure the producer write Ack=ALL and broker min.insync.replica = 2.
> The replication factor is 3. In this case, if the leader is struggling, and
> the two other replicas can not catchup with the leader and are both kicked
> out of the isr set, it is still safe to fail over to the most updated
> follower.  Because the updates which are only available on the failed
> leader won't have been acked back to the client as success, no data loss in
> terms of client updates in this case. But I didn't find any
> information/tool to failover to the newer follower in case the only isr
> replica is the failed leader. How can we recover the partition when the
> only replica in the isr failed and two other healthy followers can not
> catch up? Is there any way to manually bring the two replicas back to isr
> without actually catchup with the down leader? Thanks,
>
> -JD
>
> -- Forwarded message -
> From: JD Zheng 
> Date: Sun, Nov 8, 2020 at 7:41 PM
> Subject: Recover partitions from a failed broker leader which is the only
> replica in isr.
> To: 
>
>
> Hi,
>
> We have a case where one of our kafka brokers's data drive failed which
> kicks all the followers of the partitions with this broker as the leader
> out of the isr before the broker is completed down. In this case, all the
> partitions (this broker was the leader) become offline. What's the best way
> to recover these partitions to re-elect a new leader among the non-isr
> replicas? Is there any way to elect the replica with the highest ELO?
>
> Thanks,
>
> -JD


Re: [DISCUSS] KIP-739: Block Less on KafkaProducer#send

2021-05-16 Thread Alexandre Dupriez
Hello Nakamura,

Thanks for proposing this change. I can see how the blocking behaviour
can be a problem when integrating with reactive frameworks such as
Akka. One of the questions I would have is how you would handle back
pressure and avoid memory exhaustion when the producer's buffer is
full and tasks would start to accumulate in the out-of-band queue or
thread pool introduced with this KIP.

Thanks,
Alexandre

Le ven. 14 mai 2021 à 15:55, Ryanne Dolan  a écrit :
>
> Makes sense!
>
> Ryanne
>
> On Fri, May 14, 2021, 9:39 AM Nakamura  wrote:
>
> > Hey Ryanne,
> >
> > I see what you're saying about serde blocking, but I think we should
> > consider it out of scope for this patch.  Right now we've nailed down a
> > couple of use cases where we can unambiguously say, "I can make progress
> > now" or "I cannot make progress now", which makes it possible to offload to
> > a different thread only if we are unable to make progress.  Extending this
> > to CPU work like serde would mean always offloading, which would be a
> > really big performance change.  It might be worth exploring anyway, but I'd
> > rather keep this patch focused on improving ergonomics, rather than
> > muddying the waters with evaluating performance very deeply.
> >
> > I think if we really do want to support serde or interceptors that do IO on
> > the send path (which seems like an anti-pattern to me), we should consider
> > making that a separate SIP, and probably also consider changing the API to
> > use Futures (or CompletionStages).  But I would rather avoid scope creep,
> > so that we have a better chance of fixing this part of the problem.
> >
> > Yes, I think some exceptions will move to being async instead of sync.
> > They'll still be surfaced in the Future, so I'm not so confident that it
> > would be that big a shock to users though.
> >
> > Best,
> > Moses
> >
> > On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan 
> > wrote:
> >
> > > re serialization, my concern is that serialization often accounts for a
> > lot
> > > of the cycles spent before returning the future. It's not blocking per
> > se,
> > > but it's the same effect from the caller's perspective.
> > >
> > > Moreover, serde impls often block themselves, e.g. when fetching schemas
> > > from a registry. I suppose it's also possible to block in Interceptors
> > > (e.g. writing audit events or metrics), which happens before serdes iiuc.
> > > So any blocking in either of those plugins would block the send unless we
> > > queue first.
> > >
> > > So I think we want to queue first and do everything off-thread when using
> > > the new API, whatever that looks like. I just want to make sure we don't
> > do
> > > that for clients that wouldn't expect it.
> > >
> > > Another consideration is exception handling. If we queue right away,
> > we'll
> > > defer some exceptions that currently are thrown to the caller (before the
> > > future is returned). In the new API, the send() wouldn't throw any
> > > exceptions, and instead the future would fail. I think that might mean
> > that
> > > a new method signature is required.
> > >
> > > Ryanne
> > >
> > >
> > > On Thu, May 13, 2021, 2:57 PM Nakamura  wrote:
> > >
> > > > Hey Ryanne,
> > > >
> > > > I agree we should add an additional constructor (or else an additional
> > > > overload in KafkaProducer#send, but the new constructor would be easier
> > > to
> > > > understand) if we're targeting the "user provides the thread" approach.
> > > >
> > > > From looking at the code, I think we can keep record serialization on
> > the
> > > > user thread, if we consider that an important part of the semantics of
> > > this
> > > > method.  It doesn't seem like serialization depends on knowing the
> > > cluster,
> > > > I think it's incidental that it comes after the first "blocking"
> > segment
> > > in
> > > > the method.
> > > >
> > > > Best,
> > > > Moses
> > > >
> > > > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan 
> > > > wrote:
> > > >
> > > > > Hey Moses, I like the direction here. My thinking is that a single
> > > > > additional work queue, s.t. send() can enqueue and return, seems like
> > > the
> > > > > lightest touch. However, I don't think we can trivially process that
> > > > queue
> > > > > in an internal thread pool without subtly changing behavior for some
> > > > users.
> > > > > For example, users will often run send() in multiple threads in order
> > > to
> > > > > serialize faster, but that wouldn't work quite the same if there were
> > > an
> > > > > internal thread pool.
> > > > >
> > > > > For this reason I'm thinking we need to make sure any such changes
> > are
> > > > > opt-in. Maybe a new constructor with an additional ThreadFactory
> > > > parameter.
> > > > > That would at least clearly indicate that work will happen
> > off-thread,
> > > > and
> > > > > would require opt-in for the new behavior.
> > > > >
> > > > > Under the hood, this ThreadFactory could be used to create the worker
> > > > > thread that 

Broker-level min ISR for __consumer_offsets

2021-03-26 Thread Alexandre Dupriez
Hi, Community,

I wanted to reach out about the following scenario. It was most likely
discussed before and a long time ago: I apologise in advance for the
repeat.

I am customising broker-level replication settings of an internal
topic (in this case, the consumer offsets topic) to accommodate the
topology of test clusters which operate in highly constrained
environments (no more than 2 datacenters with a very limited number of
brokers). The replication factor of topics in these clusters cannot
exceed 2.

Now, in the case of a broker outage, I would like to favour
availability over durability -accepting the risks it brings- and allow
writes with the strongest replication semantic (ack=all) to be
accepted for the partitions of the consumer offsets topics, even if
only one of their replica remains in-sync. I noticed the transaction
state topic offers a broker-level configuration parameter to customize
the minimum size of ISR required to accept such writes [1], though
there is not an equivalent parameter available for the consumer
offsets topic. Note that I wish not lower the default minimum size of
ISR applying globally for all topics - but only that of the consumer
offsets topic (*).

Do you think it worth adding a broker-level configuration parameter
for that purpose? Of course, I understand the impact would be utterly
marginal - configuration can always be enforced dynamically via a
topic-level override anyway. But in this use case, I have to consider
only broker-level configuration.

I am interested to hear about other thoughts in case there is
something overlooked at?

Many thanks,
Alexandre

(*) There are too many "consumer offsets topic" (as a sequence of
words) in this text, I know.

[1] 
https://kafka.apache.org/documentation/#brokerconfigs_transaction.state.log.min.isr


Re: Gradle error - aggregatedJavadoc depending on "compileJava"

2021-03-24 Thread Alexandre Dupriez
Hi Chia-Ping and Ismael,

Thanks for your follow-ups and thanks for finding the root-cause for
this, that the problem appeared when the rat plugin wasn't applied
when the .git folder does not exist. This is something faced indeed
with a new locally cloned repository, while existing local
repositories for which a build had already been made worked just fine.

Thanks,
Alexandre

Le mer. 24 mars 2021 à 06:16, Chia-Ping Tsai  a écrit :
>
> hi Alexandre,
>
> please take a look at https://github.com/apache/kafka/pull/10386. We are 
> going to fix that error. Thanks for your report.
>
> On 2021/03/10 14:07:08, Alexandre Dupriez  wrote:
> > Hi Community,
> >
> > I tried to build Kafka from trunk on my environment today (2021, March
> > 10th) and it failed with the following Gradle error at the beginning
> > of the build, while Gradle configures project from build.gradle:
> >
> >   "Could not get unknown property 'compileJava' for root project
> > '' of type org.gradle.api.Project."
> >
> > The command used is "gradle releaseTarGz". Removing "dependsOn:
> > compileJava" from the task "aggregatedJavadoc" (added on March 9th
> > [1]) made the problem disappear - I wonder if anyone else encountered
> > the same problem?
> >
> > [1] https://github.com/apache/kafka/pull/10272
> >
> > Many thanks,
> > Alexandre
> >


Re: Gradle error - aggregatedJavadoc depending on "compileJava"

2021-03-10 Thread Alexandre Dupriez
Hi Ismael,

Thanks for your quick response. I used the system-installed gradle.
Surprisingly, the same command works successfully as well on my laptop
but not on the target environment - both use the same version of
Gradle (6.8.3) and JDK (1.8), and start with an empty Gradle cache.
Note the error is given before any compilation actually happens, at
the "configure" stage of the build.

Thanks,
Alexandre

Le mer. 10 mars 2021 à 20:58, Ismael Juma  a écrit :
>
> Hi Alexandre,
>
> Did you use `./gradlew releaseTarGz` (you should never use the system
> installed gradle)? That works for me. Also `./gradlew clean releaseTarGz`
> works for me.
>
> Ismael
>
> On Wed, Mar 10, 2021 at 8:42 AM Ismael Juma  wrote:
>
> > Interesting, I didn't have the same problem. I'll try to reproduce.
> >
> > Ismael
> >
> > On Wed, Mar 10, 2021, 6:07 AM Alexandre Dupriez <
> > alexandre.dupr...@gmail.com> wrote:
> >
> >> Hi Community,
> >>
> >> I tried to build Kafka from trunk on my environment today (2021, March
> >> 10th) and it failed with the following Gradle error at the beginning
> >> of the build, while Gradle configures project from build.gradle:
> >>
> >>   "Could not get unknown property 'compileJava' for root project
> >> '' of type org.gradle.api.Project."
> >>
> >> The command used is "gradle releaseTarGz". Removing "dependsOn:
> >> compileJava" from the task "aggregatedJavadoc" (added on March 9th
> >> [1]) made the problem disappear - I wonder if anyone else encountered
> >> the same problem?
> >>
> >> [1] https://github.com/apache/kafka/pull/10272
> >>
> >> Many thanks,
> >> Alexandre
> >>
> >


Gradle error - aggregatedJavadoc depending on "compileJava"

2021-03-10 Thread Alexandre Dupriez
Hi Community,

I tried to build Kafka from trunk on my environment today (2021, March
10th) and it failed with the following Gradle error at the beginning
of the build, while Gradle configures project from build.gradle:

  "Could not get unknown property 'compileJava' for root project
'' of type org.gradle.api.Project."

The command used is "gradle releaseTarGz". Removing "dependsOn:
compileJava" from the task "aggregatedJavadoc" (added on March 9th
[1]) made the problem disappear - I wonder if anyone else encountered
the same problem?

[1] https://github.com/apache/kafka/pull/10272

Many thanks,
Alexandre


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-08-20 Thread Alexandre Dupriez
Hi Jun,

Many thanks for your initiative.

If you like, I am happy to attend at the time you suggested.

Many thanks,
Alexandre

Le mer. 19 août 2020 à 22:00, Harsha Ch  a écrit :

> Hi Jun,
>  Thanks. This will help a lot. Tuesday will work for us.
> -Harsha
>
>
> On Wed, Aug 19, 2020 at 1:24 PM Jun Rao  wrote:
>
> > Hi, Satish, Ying, Harsha,
> >
> > Do you think it would be useful to have a regular virtual meeting to
> > discuss this KIP? The goal of the meeting will be sharing
> > design/development progress and discussing any open issues to
> > accelerate this KIP. If so, will every Tuesday (from next week) 9am-10am
> PT
> > work for you? I can help set up a Zoom meeting, invite everyone who might
> > be interested, have it recorded and shared, etc.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Aug 18, 2020 at 11:01 AM Satish Duggana <
> satish.dugg...@gmail.com>
> > wrote:
> >
> > > Hi  Kowshik,
> > >
> > > Thanks for looking into the  KIP and sending your comments.
> > >
> > > 5001. Under the section "Follower fetch protocol in detail", the
> > > next-local-offset is the offset upto which the segments are copied to
> > > remote storage. Instead, would last-tiered-offset be a better name than
> > > next-local-offset? last-tiered-offset seems to naturally align well
> with
> > > the definition provided in the KIP.
> > >
> > > Both next-local-offset and local-log-start-offset were introduced to
> > > talk about offsets related to local log. We are fine with
> > > last-tiered-offset too as you suggested.
> > >
> > > 5002. After leadership is established for a partition, the leader would
> > > begin uploading a segment to remote storage. If successful, the leader
> > > would write the updated RemoteLogSegmentMetadata to the metadata topic
> > (via
> > > RLMM.putRemoteLogSegmentData). However, for defensive reasons, it seems
> > > useful that before the first time the segment is uploaded by the leader
> > for
> > > a partition, the leader should ensure to catch up to all the metadata
> > > events written so far in the metadata topic for that partition (ex: by
> > > previous leader). To achieve this, the leader could start a lease
> (using
> > an
> > > establish_leader metadata event) before commencing tiering, and wait
> > until
> > > the event is read back. For example, this seems useful to avoid cases
> > where
> > > zombie leaders can be active for the same partition. This can also
> prove
> > > useful to help avoid making decisions on which segments to be uploaded
> > for
> > > a partition, until the current leader has caught up to a complete view
> of
> > > all segments uploaded for the partition so far (otherwise this may
> cause
> > > same segment being uploaded twice -- once by the previous leader and
> then
> > > by the new leader).
> > >
> > > We allow copying segments to remote storage which may have common
> > > offsets. Please go through the KIP to understand the follower fetch
> > > protocol(1) and follower to leader transition(2).
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Followertoleadertransition
> > >
> > >
> > > 5003. There is a natural interleaving between uploading a segment to
> > remote
> > > store, and, writing a metadata event for the same (via
> > > RLMM.putRemoteLogSegmentData). There can be cases where a remote
> segment
> > is
> > > uploaded, then the leader fails and a corresponding metadata event
> never
> > > gets written. In such cases, the orphaned remote segment has to be
> > > eventually deleted (since there is no confirmation of the upload). To
> > > handle this, we could use 2 separate metadata events viz.
> copy_initiated
> > > and copy_completed, so that copy_initiated events that don't have a
> > > corresponding copy_completed event can be treated as garbage and
> deleted
> > > from the remote object store by the broker.
> > >
> > > We are already updating RMM with RemoteLogSegmentMetadata pre and post
> > > copying of log segments. We had a flag in RemoteLogSegmentMetadata
> > > whether it is copied or not. But we are making changes in
> > > RemoteLogSegmentMetadata to introduce a state field in
> > > RemoteLogSegmentMetadata which will have the respective started and
> > > finished states. This includes for other operations like delete too.
> > >
> > > 5004. In the default implementation of RLMM (using the internal topic
> > > __remote_log_metadata), a separate topic called
> > > __remote_segments_to_be_deleted is going to be used just to track
> > failures
> > > in removing remote log segments. A separate topic (effectively another
> > > metadata stream) introduces some maintenance overhead and design
> > > complexity. It seems to me that the same can be achieved just by using
> > just
> > > the __remote_log_metadata topic 

Re: [DISCUSS] KIP-578: Add configuration to limit number of partitions

2020-06-12 Thread Alexandre Dupriez
gt; > we
> > > > > could provide a custom implementation. I am afraid that would add
> > > > > complexity that I am not sure we want to undertake.
> > > > >
> > > > > Do you see sense in what I am saying?
> > > > >
> > > > > Thanks.
> > > > >
> > > > > On Mon, Apr 20, 2020 at 3:59 PM Tom Bentley 
> > > wrote:
> > > > >
> > > > > > Hi Gokul,
> > > > > >
> > > > > > Leaving aside the question of how Kafka scales, I think the
> > proposed
> > > > > > solution, limiting the number of partitions in a cluster or
> > > per-broker,
> > > > > is
> > > > > > a policy which ought to be addressable via the pluggable policies
> > > (e.g.
> > > > > > create.topic.policy.class.name). Unfortunately although there's a
> > > > policy
> > > > > > for topic creation, it's currently not possible to enforce a policy
> > > on
> > > > > > partition increase. It would be more flexible to be able enforce
> > this
> > > > > kind
> > > > > > of thing via a pluggable policy, and it would also avoid the
> > > situation
> > > > > > where different people each want to have a config which addresses
> > > some
> > > > > > specific use case or problem that they're experiencing.
> > > > > >
> > > > > > Quite a while ago I proposed KIP-201 to solve this issue with
> > > policies
> > > > > > being easily circumvented, but it didn't really make any progress.
> > > I've
> > > > > > looked at it again in some detail more recently and I think
> > something
> > > > > might
> > > > > > be possible following the work to make all ZK writes happen on the
> > > > > > controller.
> > > > > >
> > > > > > Of course, this is just my take on it.
> > > > > >
> > > > > > Kind regards,
> > > > > >
> > > > > > Tom
> > > > > >
> > > > > > On Thu, Apr 16, 2020 at 11:47 AM Gokul Ramanan Subramanian <
> > > > > > gokul24...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi.
> > > > > > >
> > > > > > > For the sake of expediting the discussion, I have created a
> > > prototype
> > > > > PR:
> > > > > > > https://github.com/apache/kafka/pull/8499. Eventually, (if and)
> > > when
> > > > > the
> > > > > > > KIP is accepted, I'll modify this to add the full implementation
> > > and
> > > > > > tests
> > > > > > > etc. in there.
> > > > > > >
> > > > > > > Would appreciate if a Kafka committer could share their thoughts,
> > > so
> > > > > > that I
> > > > > > > can more confidently start the voting thread.
> > > > > > >
> > > > > > > Thanks.
> > > > > > >
> > > > > > > On Thu, Apr 16, 2020 at 11:30 AM Gokul Ramanan Subramanian <
> > > > > > > gokul24...@gmail.com> wrote:
> > > > > > >
> > > > > > > > Thanks for your comments Alex.
> > > > > > > >
> > > > > > > > The KIP proposes using two configurations max.partitions and
> > > > > > > > max.broker.partitions. It does not enforce their use. The
> > default
> > > > > > values
> > > > > > > > are pretty large (INT MAX), therefore should be non-intrusive.
> > > > > > > >
> > > > > > > > In multi-tenant environments and in partition assignment and
> > > > > > rebalancing,
> > > > > > > > the admin could (a) use the default values which would yield
> > > > similar
> > > > > > > > behavior to now, (b) set very high values that they know is
> > > > > sufficient,
> > > > > > > (c)
> > > > > > > > dynamically re-adjust the values should the business
> > requirements
> > > > > > change.
> > > > > > > > Note that the two configurations are cluster-wide, so they 

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-06-06 Thread Alexandre Dupriez
S3 list object requests cost $0.005 per 1000 requests. If
> > >>>>>>>
> > >>>>>>> you
> > >>>>>>>
> > >>>>>>> have 100,000 partitions and want to pull the metadata for each
> > >>>>>>>
> > >>>>>>> partition
> > >>>>>>>
> > >>>>>>> at
> > >>>>>>>
> > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per
> > >>>>>>>
> > >>>>>>> day.
> > >>>>>>>
> > >>>>>>> I want to note here, that no reasonably durable storage will be 
> > >>>>>>> cheap at 100k RPS. For example, DynamoDB might give the same 
> > >>>>>>> ballpark
> > >>>>>>>
> > >>>>>>> figures.
> > >>>>>>>
> > >>>>>>> If we want to keep the pull-based approach, we can try to reduce 
> > >>>>>>> this
> > >>>>>>>
> > >>>>>>> number
> > >>>>>>>
> > >>>>>>> in several ways: doing listings less frequently (as Satish 
> > >>>>>>> mentioned, with the current defaults it's ~3.33k RPS for your 
> > >>>>>>> example), batching listing operations in some way (depending on the 
> > >>>>>>> storage; it might require the change of RSM's interface).
> > >>>>>>>
> > >>>>>>> There are different ways for doing push based metadata propagation.
> > >>>>>>>
> > >>>>>>> Some
> > >>>>>>>
> > >>>>>>> object stores may support that already. For example, S3 supports
> > >>>>>>>
> > >>>>>>> events
> > >>>>>>>
> > >>>>>>> notification
> > >>>>>>>
> > >>>>>>> This sounds interesting. However, I see a couple of issues using it:
> > >>>>>>> 1. As I understand the documentation, notification delivery is not 
> > >>>>>>> guaranteed
> > >>>>>>> and it's recommended to periodically do LIST to fill the gaps. 
> > >>>>>>> Which brings us back to the same LIST consistency guarantees issue.
> > >>>>>>> 2. The same goes for the broker start: to get the current state, we
> > >>>>>>>
> > >>>>>>> need
> > >>>>>>>
> > >>>>>>> to LIST.
> > >>>>>>> 3. The dynamic set of multiple consumers (RSMs): AFAIK SQS and SNS
> > >>>>>>>
> > >>>>>>> aren't
> > >>>>>>>
> > >>>>>>> designed for such a case.
> > >>>>>>>
> > >>>>>>> Alexandre:
> > >>>>>>>
> > >>>>>>> A.1 As commented on PR 7561, S3 consistency model [1][2] implies RSM
> > >>>>>>>
> > >>>>>>> cannot
> > >>>>>>>
> > >>>>>>> relies solely on S3 APIs to guarantee the expected strong
> > >>>>>>>
> > >>>>>>> consistency. The
> > >>>>>>>
> > >>>>>>> proposed implementation [3] would need to be updated to take this
> > >>>>>>>
> > >>>>>>> into
> > >>>>>>>
> > >>>>>>> account. Let’s talk more about this.
> > >>>>>>>
> > >>>>>>> Thank you for the feedback. I clearly see the need for changing the 
> > >>>>>>> S3 implementation
> > >>>>>>> to provide stronger consistency guarantees. As it see from this 
> > >>>>>>> thread, there are
> > >>>>>>> several possible approaches to this. Let's discuss 
> > >>>>>>> RemoteLogManager's contract and
> > >>>>>>> behavior (like pull vs push model) further before picking one (or
> > >>>>>>>
> > &

Re: [VOTE] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-19 Thread Alexandre Dupriez
+1 (non-binding)

Thank you for the KIP!


Le mar. 19 mai 2020 à 07:57, David Jacot  a écrit :
>
> +1 (non-binding)
>
> Thanks for the KIP, Anna!
>
> On Tue, May 19, 2020 at 7:12 AM Satish Duggana 
> wrote:
>
> > +1 (non-binding)
> > Thanks Anna for the nice feature to control the connection creation rate
> > from the clients.
> >
> > On Tue, May 19, 2020 at 8:16 AM Gwen Shapira  wrote:
> >
> > > +1 (binding)
> > >
> > > Thank you for driving this, Anna
> > >
> > > On Mon, May 18, 2020 at 4:55 PM Anna Povzner  wrote:
> > >
> > > > Hi All,
> > > >
> > > > I would like to start the vote on KIP-612: Ability to limit connection
> > > > creation rate on brokers.
> > > >
> > > > For reference, here is the KIP wiki:
> > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers
> > > >
> > > > And discussion thread:
> > > >
> > > >
> > >
> > https://lists.apache.org/thread.html/r61162661fa307d0bc5c8326818bf223a689c49e1c828c9928ee26969%40%3Cdev.kafka.apache.org%3E
> > > >
> > > > Thanks,
> > > >
> > > > Anna
> > > >
> > >
> > >
> > > --
> > > Gwen Shapira
> > > Engineering Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter | blog
> > >
> >


Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-19 Thread Alexandre Dupriez
;
> >>> > > Hi Anna,
> >>> > >
> >>> > > Thanks for the response, sounds good.
> >>> > >
> >>> > > Regards,
> >>> > >
> >>> > > Rajini
> >>> > >
> >>> > >
> >>> > > On Sun, May 17, 2020 at 1:38 AM Anna Povzner 
> >>> wrote:
> >>> > >
> >>> > > > Hi Rajini,
> >>> > > >
> >>> > > > Thanks for reviewing the KIP!
> >>> > > >
> >>> > > > I agree with your suggestion to make per-IP connection rate quota a
> >>> > > dynamic
> >>> > > > quota for entity name IP. This will allow configuring connection
> >>> rate
> >>> > > for a
> >>> > > > particular IP as well. I updated the wiki accordingly.
> >>> > > >
> >>> > > > Your second concern makes sense -- rejecting the connection right
> >>> away
> >>> > > will
> >>> > > > likely cause a new connection from the same client. I am concerned
> >>> > about
> >>> > > > delaying new connections for processing later, because if the
> >>> > connections
> >>> > > > keep coming with the high rate, there may be potentially a large
> >>> > backlog
> >>> > > > and connections may start timing out before the broker gets to
> >>> > processing
> >>> > > > them. For example, if clients come through proxy, there may be
> >>> > > > potentially a large number of incoming connections with the same
> >>> IP.
> >>> > > >
> >>> > > > What do you think about the following option:
> >>> > > > * Once per-IP connection rate reaches the limit, accept or drop
> >>> (clean
> >>> > > up)
> >>> > > > the connection after a delay depending on whether the quota is
> >>> still
> >>> > > > violated. We could re-use the mechanism implemented with KIP-306
> >>> where
> >>> > > the
> >>> > > > broker delays the response for failed client authentication. The
> >>> delay
> >>> > > will
> >>> > > > be set to min(delay calculated based on the rate quota, 1 second),
> >>> > which
> >>> > > > matches the max delay for request quota.
> >>> > > >
> >>> > > > I think this option is somewhat your suggestion with delaying
> >>> accepting
> >>> > > per
> >>> > > > IP connections that reached the rate limit, but with protection in
> >>> > place
> >>> > > to
> >>> > > > make sure the number of delayed connections does not blow up. What
> >>> do
> >>> > you
> >>> > > > think?
> >>> > > >
> >>> > > > Thanks,
> >>> > > > Anna
> >>> > > >
> >>> > > > On Sat, May 16, 2020 at 1:09 AM Alexandre Dupriez <
> >>> > > > alexandre.dupr...@gmail.com> wrote:
> >>> > > >
> >>> > > > > Hi Anna,
> >>> > > > >
> >>> > > > > Thank you for your answers and explanations.
> >>> > > > >
> >>> > > > > A couple of additional comments:
> >>> > > > >
> >>> > > > > 900. KIP-612 does not intend to dedicate a metric to the
> >>> throttling
> >>> > of
> >>> > > > > incoming connections. I wonder if such a metric would be handy
> >>> for
> >>> > > > > monitoring and help set-up metric-based alarming if one wishes to
> >>> > > > > capture this type of incident?
> >>> > > > >
> >>> > > > > 901. Following-up on Rajini's point 2 above - from my
> >>> understanding,
> >>> > > > > this new quota should prevent excess CPU consumption in
> >>> > > > > Processor#run() method when a new connection has been accepted.
> >>> > > > > Through the throttling in place, connections will be delayed as
> >>> > > > > indicated by the KIP's specifications:
&

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-05-17 Thread Alexandre Dupriez
ion
> >>
> >> at
> >>
> >> the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per
> >>
> >> day.
> >>
> >> I want to note here, that no reasonably durable storage will be cheap at
> >> 100k RPS. For example, DynamoDB might give the same ballpark
> >>
> >> figures.
> >>
> >> If we want to keep the pull-based approach, we can try to reduce this
> >>
> >> number
> >>
> >> in several ways: doing listings less frequently (as Satish mentioned,
> >> with the current defaults it's ~3.33k RPS for your example), batching
> >> listing operations in some way (depending on the storage; it might require
> >> the change of RSM's interface).
> >>
> >> There are different ways for doing push based metadata propagation.
> >>
> >> Some
> >>
> >> object stores may support that already. For example, S3 supports
> >>
> >> events
> >>
> >> notification
> >>
> >> This sounds interesting. However, I see a couple of issues using it:
> >> 1. As I understand the documentation, notification delivery is not
> >> guaranteed
> >> and it's recommended to periodically do LIST to fill the gaps. Which
> >> brings us back to the same LIST consistency guarantees issue.
> >> 2. The same goes for the broker start: to get the current state, we
> >>
> >> need
> >>
> >> to LIST.
> >> 3. The dynamic set of multiple consumers (RSMs): AFAIK SQS and SNS
> >>
> >> aren't
> >>
> >> designed for such a case.
> >>
> >> Alexandre:
> >>
> >> A.1 As commented on PR 7561, S3 consistency model [1][2] implies RSM
> >>
> >> cannot
> >>
> >> relies solely on S3 APIs to guarantee the expected strong
> >>
> >> consistency. The
> >>
> >> proposed implementation [3] would need to be updated to take this
> >>
> >> into
> >>
> >> account. Let’s talk more about this.
> >>
> >> Thank you for the feedback. I clearly see the need for changing the S3
> >> implementation
> >> to provide stronger consistency guarantees. As it see from this thread,
> >> there are
> >> several possible approaches to this. Let's discuss RemoteLogManager's
> >> contract and
> >> behavior (like pull vs push model) further before picking one (or
> >>
> >> several -
> >>
> >> ?) of them.
> >> I'm going to do some evaluation of DynamoDB for the pull-based
> >>
> >> approach,
> >>
> >> if it's possible to apply it paying a reasonable bill. Also, of the
> >> push-based approach
> >> with a Kafka topic as the medium.
> >>
> >> A.2.3 Atomicity – what does an implementation of RSM need to provide
> >>
> >> with
> >>
> >> respect to atomicity of the APIs copyLogSegment, cleanupLogUntil and
> >> deleteTopicPartition? If a partial failure happens in any of those
> >>
> >> (e.g.
> >>
> >> in
> >>
> >> the S3 implementation, if one of the multiple uploads fails [4]),
> >>
> >> The S3 implementation is going to change, but it's worth clarifying
> >>
> >> anyway.
> >>
> >> The segment log file is being uploaded after S3 has acked uploading of
> >> all other files associated with the segment and only after this the
> >>
> >> whole
> >>
> >> segment file set becomes visible remotely for operations like
> >> listRemoteSegments [1].
> >> In case of upload failure, the files that has been successfully
> >>
> >> uploaded
> >>
> >> stays
> >> as invisible garbage that is collected by cleanupLogUntil (or
> >>
> >> overwritten
> >>
> >> successfully later).
> >> And the opposite happens during the deletion: log files are deleted
> >>
> >> first.
> >>
> >> This approach should generally work when we solve consistency issues by
> >> adding a strongly consistent storage: a segment's uploaded files
> >>
> >> remain
> >>
> >> invisible garbage until some metadata about them is written.
> >>
> >> A.3 Caching – storing locally the segments retrieved from the remote
> >> storage is excluded as it does not align with the original intent
> >>
> >> and even
> 

[jira] [Resolved] (KAFKA-6959) Any impact we foresee if we upgrade Linux version or move to VM instead of physical Linux server

2020-05-16 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-6959.
--
Resolution: Fixed

> Any impact we foresee if we upgrade Linux version or move to VM instead of 
> physical Linux server
> 
>
> Key: KAFKA-6959
> URL: https://issues.apache.org/jira/browse/KAFKA-6959
> Project: Kafka
>  Issue Type: Task
>  Components: admin
>Affects Versions: 0.11.0.2
> Environment: Prod
>Reporter: Gene Yi
>Priority: Trivial
>  Labels: patch, performance, security
>
> As we know that the recent issue on the Liunx Meltdown and Spectre. all the 
> Linux servers need to deploy the patch and the OS version at least to be 6.9. 
> we want to know the impact to Kafka, is there any side effect if we directly 
> upgrade the OS to 7.0,  also is there any limitation if we deploy Kafka to VM 
> instead of the physical servers?
> currently the Kafka version we used is 0.11.0.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8916) Unreliable kafka-reassign-partitions.sh affecting performance

2020-05-16 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-8916.
--
Resolution: Invalid

Closing this as there is no bug or development required.

Please kindly reach out to the user mailing list for this type of question: 
us...@kafka.apache.org. More information to engage the Kafka community is 
available on https://kafka.apache.org/contact.html.


> Unreliable kafka-reassign-partitions.sh affecting performance
> -
>
> Key: KAFKA-8916
> URL: https://issues.apache.org/jira/browse/KAFKA-8916
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Affects Versions: 2.1.1
> Environment: CentOS 7
>Reporter: VinayKumar
>Priority: Major
>  Labels: performance
>
> Currently I have 3 node kafka cluster, and I want to add 2 more nodes to make 
> it 5 node cluster.
>  *After adding the nodes to cluster, I need all the topic partitions to be 
> evenly distributed across all the 5 nodes.
>  **In the past, when I ran kafka-reassign-partitions.sh & 
> kafka-preferred-replica-election.sh, it ran for very long time, hung & made 
> the cluster unstable. So I'm afraid to use this method.
> Can you please suggest the best & foolproof way to assign partitions among 
> all the cluster nodes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8815) Kafka broker blocked on I/O primitive

2020-05-16 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-8815.
--
Resolution: Not A Problem

System failure. Not related to Kafka.

> Kafka broker blocked on I/O primitive
> -
>
> Key: KAFKA-8815
> URL: https://issues.apache.org/jira/browse/KAFKA-8815
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 1.1.1
>        Reporter: Alexandre Dupriez
>Priority: Major
>
> This JIRA is for tracking a problem we run into on a production cluster.
> *Scenario*
> Cluster of 15 brokers and an average ingress throughput of ~4 MB/s and egress 
> of ~4 MB/s per broker.
> Brokers are running on OpenJDK 8. They are configured with a heap size of 1 
> GB.
> There is around ~1,000 partition replicas per broker. Load is evenly 
> balanced. Each broker instance is under fair CPU load, but not overloaded 
> (50-60%). G1 is used for garbage collection and doesn't exhibit any pressure, 
> with mostly short young GC observed and an heap-after-GC usage of 70%.
> Replication factor is 3.
> *Symptom*
> One broker on the cluster suddenly became "unresponsive". Other brokers, 
> Zookeeper and producers/consumers requests were failing with timeouts. The 
> Kafka process, however, was still alive and doing some background work 
> (truncating logs and rolling segments) This lasted for hours. At some point, 
> several thread dumps were taken at few minutes interval. Most of the threads 
> were "blocked". Deadlock was ruled out. The most suspicious stack is the 
> following 
> {code:java}
> Thread 7801: (state = BLOCKED)
>  - sun.nio.ch.FileChannelImpl.write(java.nio.ByteBuffer) @bci=25, line=202 
> (Compiled frame)
>  - 
> org.apache.kafka.common.record.MemoryRecords.writeFullyTo(java.nio.channels.GatheringByteChannel)
>  @bci=24, line=93 (Compiled frame)
>  - 
> org.apache.kafka.common.record.FileRecords.append(org.apache.kafka.common.record.MemoryRecords)
>  @bci=5, line=152 (Compiled frame)
>  - kafka.log.LogSegment.append(long, long, long, long, 
> org.apache.kafka.common.record.MemoryRecords) @bci=82, line=136 (Compiled 
> frame)
>  - kafka.log.Log.$anonfun$append$2(kafka.log.Log, 
> org.apache.kafka.common.record.MemoryRecords, boolean, boolean, int, 
> java.lang.Object) @bci=1080, line=757 (Compiled frame)
>  - kafka.log.Log$$Lambda$614.apply() @bci=24 (Compiled frame)
>  - kafka.log.Log.maybeHandleIOException(scala.Function0, scala.Function0) 
> @bci=1, line=1696 (Compiled frame)
>  - kafka.log.Log.append(org.apache.kafka.common.record.MemoryRecords, 
> boolean, boolean, int) @bci=29, line=642 (Compiled frame)
>  - kafka.log.Log.appendAsLeader(org.apache.kafka.common.record.MemoryRecords, 
> int, boolean) @bci=5, line=612 (Compiled frame)
>  - 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(kafka.cluster.Partition,
>  org.apache.kafka.common.record.MemoryRecords, boolean, int) @bci=148, 
> line=609 (Compiled frame)
>  - kafka.cluster.Partition$$Lambda$837.apply() @bci=16 (Compiled frame)
>  - kafka.utils.CoreUtils$.inLock(java.util.concurrent.locks.Lock, 
> scala.Function0) @bci=7, line=250 (Compiled frame)
>  - 
> kafka.utils.CoreUtils$.inReadLock(java.util.concurrent.locks.ReadWriteLock, 
> scala.Function0) @bci=8, line=256 (Compiled frame)
>  - 
> kafka.cluster.Partition.appendRecordsToLeader(org.apache.kafka.common.record.MemoryRecords,
>  boolean, int) @bci=16, line=597 (Compiled frame)
>  - 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(kafka.server.ReplicaManager,
>  boolean, boolean, short, scala.Tuple2) @bci=295, line=739 (Compiled frame)
>  - kafka.server.ReplicaManager$$Lambda$836.apply(java.lang.Object) @bci=20 
> (Compiled frame)
>  - scala.collection.TraversableLike.$anonfun$map$1(scala.Function1, 
> scala.collection.mutable.Builder, java.lang.Object) @bci=3, line=234 
> (Compiled frame)
>  - scala.collection.TraversableLike$$Lambda$14.apply(java.lang.Object) @bci=9 
> (Compiled frame)
>  - scala.collection.mutable.HashMap.$anonfun$foreach$1(scala.Function1, 
> scala.collection.mutable.DefaultEntry) @bci=16, line=138 (Compiled frame)
>  - scala.collection.mutable.HashMap$$Lambda$31.apply(java.lang.Object) @bci=8 
> (Compiled frame)
>  - scala.collection.mutable.HashTable.foreachEntry(scala.Function1) @bci=39, 
> line=236 (Compiled frame)
>  - 
> scala.collection.mutable.HashTable.foreachEntry$(scala.collection.mutable.HashTable,
>  scala.Function1) @bci=2, line=229 (Compiled frame)
>  - scala.collection.mutable.HashMap.foreachEntry(

Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-16 Thread Alexandre Dupriez
> > network threads use for creating new connections (and accepting on
> > > Acceptor
> > > > thread) vs. doing other work on each Processor iteration. It does not
> > > > directly control how processing connection creations would be related
> > to
> > > > other processing done by brokers like on request handler threads. So,
> > > while
> > > > controlling queue size may mitigate the issue for some of the
> > workloads,
> > > it
> > > > does not guarantee that. Plus, if we want to limit how many connections
> > > are
> > > > created per IP, the queue size approach would not work, unless we go
> > > with a
> > > > "share" of the queue, which I think even further obscures what that
> > > setting
> > > > means (and what we would achieve as an end result). Does this answer
> > the
> > > > question?
> > > >
> > > > If there are no objections, I will update the KIP to add per IP
> > > connection
> > > > rate limits (config and enforcement).
> > > >
> > > > Thanks,
> > > >
> > > > Anna
> > > >
> > > >
> > > > On Tue, May 12, 2020 at 11:25 AM Alexandre Dupriez <
> > > > alexandre.dupr...@gmail.com> wrote:
> > > >
> > > >> Hello,
> > > >>
> > > >> Thank you for the KIP.
> > > >>
> > > >> I experienced in the past genuine broker brownouts due to connection
> > > >> storms consuming most of the CPU available on the server and I think
> > > >> it is useful to protect against it.
> > > >>
> > > >> I tend to share the questions asked in points 2 and 4 from David. Is
> > > >> there still a risk of denial of service if the limit applies at the
> > > >> listener-level without differentiating between (an) “offending”
> > > >> client(s) and the others?
> > > >>
> > > >> To rebound on point 3 - conceptually one difference between capping
> > > >> the queue size or throttling as presented in the KIP would come from
> > > >> the time it takes to accept a connection and how that time evolves
> > > >> with the connection rate.
> > > >> Assuming that that time increases monotonically with resource
> > > >> utilization, the admissible rate of connections would decrease as the
> > > >> server becomes more loaded, if the limit was set on queue size.
> > > >>
> > > >> Thanks,
> > > >> Alexandre
> > > >>
> > > >> Le mar. 12 mai 2020 à 08:49, David Jacot  a
> > écrit
> > > :
> > > >> >
> > > >> > Hi Anna,
> > > >> >
> > > >> > Thanks for the KIP! I have few questions:
> > > >> >
> > > >> > 1. You mention that some clients may create a new connections for
> > each
> > > >> > requests: "Another example is clients that create a new connection
> > for
> > > >> each
> > > >> > produce/consume request". I am curious here but do we know any
> > clients
> > > >> > behaving like this?
> > > >> >
> > > >> > 2. I am a bit concerned by the impact of misbehaving clients on the
> > > >> other
> > > >> > ones. Let's say that we define a quota of 10 connections / sec for a
> > > >> broker
> > > >> > and that we have a misbehaving application constantly trying to
> > create
> > > >> 20
> > > >> > connections on that broker. That application will constantly hit the
> > > >> quota
> > > >> > and
> > > >> > always have many pending connections in the queue waiting to be
> > > >> accepted.
> > > >> > Regular clients trying to connect would need to wait until all the
> > > >> pending
> > > >> > connections upfront in the queue are drained in the best case
> > scenario
> > > >> or
> > > >> > won't be able to connect at all in the worst case scenario if the
> > > queue
> > > >> is
> > > >> > full.
> > > >> > Does it sound like a valid concern? How do you see this?
> > > >> >
> > > >> > 3. As you mention it in the KIP, we use bounded queues which already
> > > >> limit
> > > >> > the maximum number of connections that can be accepted. I wonder if
> > we
> > > >> > could reach the same goal by making the size of the queue
> > > configurable.
> > > >> >
> > > >> > 4. Did you consider doing something similar to the connections quota
> > > >> which
> > > >> > limits the number of connections per IP? Instead of rate limiting
> > all
> > > >> the
> > > >> > creation,
> > > >> > we could perhaps rate limit the number of creation per IP as well.
> > > That
> > > >> > could
> > > >> > perhaps reduce the effect on the other clients. That may be harder
> > to
> > > >> > implement
> > > >> > though.
> > > >> >
> > > >> > Best,
> > > >> > David
> > > >> >
> > > >> > On Mon, May 11, 2020 at 7:58 PM Anna Povzner 
> > > wrote:
> > > >> >
> > > >> > > Hi,
> > > >> > >
> > > >> > > I just created KIP-612 to allow limiting connection creation rate
> > on
> > > >> > > brokers, and would like to start a discussion.
> > > >> > >
> > > >> > >
> > > >> > >
> > > >>
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers
> > > >> > >
> > > >> > > Feedback and suggestions are welcome!
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Anna
> > > >> > >
> > > >>
> > > >
> > >
> >


Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-12 Thread Alexandre Dupriez
Hello,

Thank you for the KIP.

I experienced in the past genuine broker brownouts due to connection
storms consuming most of the CPU available on the server and I think
it is useful to protect against it.

I tend to share the questions asked in points 2 and 4 from David. Is
there still a risk of denial of service if the limit applies at the
listener-level without differentiating between (an) “offending”
client(s) and the others?

To rebound on point 3 - conceptually one difference between capping
the queue size or throttling as presented in the KIP would come from
the time it takes to accept a connection and how that time evolves
with the connection rate.
Assuming that that time increases monotonically with resource
utilization, the admissible rate of connections would decrease as the
server becomes more loaded, if the limit was set on queue size.

Thanks,
Alexandre

Le mar. 12 mai 2020 à 08:49, David Jacot  a écrit :
>
> Hi Anna,
>
> Thanks for the KIP! I have few questions:
>
> 1. You mention that some clients may create a new connections for each
> requests: "Another example is clients that create a new connection for each
> produce/consume request". I am curious here but do we know any clients
> behaving like this?
>
> 2. I am a bit concerned by the impact of misbehaving clients on the other
> ones. Let's say that we define a quota of 10 connections / sec for a broker
> and that we have a misbehaving application constantly trying to create 20
> connections on that broker. That application will constantly hit the quota
> and
> always have many pending connections in the queue waiting to be accepted.
> Regular clients trying to connect would need to wait until all the pending
> connections upfront in the queue are drained in the best case scenario or
> won't be able to connect at all in the worst case scenario if the queue is
> full.
> Does it sound like a valid concern? How do you see this?
>
> 3. As you mention it in the KIP, we use bounded queues which already limit
> the maximum number of connections that can be accepted. I wonder if we
> could reach the same goal by making the size of the queue configurable.
>
> 4. Did you consider doing something similar to the connections quota which
> limits the number of connections per IP? Instead of rate limiting all the
> creation,
> we could perhaps rate limit the number of creation per IP as well. That
> could
> perhaps reduce the effect on the other clients. That may be harder to
> implement
> though.
>
> Best,
> David
>
> On Mon, May 11, 2020 at 7:58 PM Anna Povzner  wrote:
>
> > Hi,
> >
> > I just created KIP-612 to allow limiting connection creation rate on
> > brokers, and would like to start a discussion.
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers
> >
> > Feedback and suggestions are welcome!
> >
> > Thanks,
> > Anna
> >


[jira] [Resolved] (KAFKA-9549) Local storage implementations for RSM which can be used in tests

2020-04-27 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-9549.
--
Resolution: Fixed

> Local storage implementations for RSM which can be used in tests
> 
>
> Key: KAFKA-9549
> URL: https://issues.apache.org/jira/browse/KAFKA-9549
> Project: Kafka
>  Issue Type: Sub-task
>  Components: system tests
>Reporter: Satish Duggana
>        Assignee: Alexandre Dupriez
>Priority: Major
>
> The goal of this task is to implement a straightforward file-system based 
> implementation of the {{RemoteStorageManager}} defined as part of the SPI for 
> Tiered Storage.
> It is intended to be used in single-host integration tests where the remote 
> storage is or can be exercised. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-551: Expose disk read and write metrics

2020-04-17 Thread Alexandre Dupriez
Hello all,

Apologies for the excavation. +1 (non-binding).

One of the advantage I see having the metrics exposed by Kafka, as
opposed to a system monitoring tool, is that it can be refined to
provide a segregated view on the consumption of disk resources, for
instance factored by topic or principal.

Another metrics I think could be useful would be the latency of file
read/write on Kafka's ingest or fetch data paths.
The motivation would be to help correlate the degradation of
applicative throughput with that of the file system read or write,
which happens for instance when the page cache becomes saturated and
synchronous page frame reclamation starts slowing down write(2)
syscalls.

Do you think that could be a useful addition to the metrics generated
by Apache Kafka?

Many thanks,
Alexandre

Le jeu. 16 janv. 2020 à 22:43, Colin McCabe  a écrit :
>
> With binding +1 votes from Gwen Shapira, Manikumar Reddy, Mickael Maison, and 
>   David Arthur, and non-binding +1 votes from Jose Garcia Sancio, M. Manna, 
> Lucas Bradstreet, Mitchell, and Sönke Liebau, the vote passes.
>
> Thanks, all!
> Colin
>
>
> On Wed, Jan 15, 2020, at 09:25, Colin McCabe wrote:
> > Thanks, all.  I will close the vote later today.
> >
> > best,
> > Colin
> >
> >
> > On Wed, Jan 15, 2020, at 01:48, Mickael Maison wrote:
> > > +1 (binding)
> > > Thanks for the KIP
> > >
> > > On Tue, Jan 14, 2020 at 6:50 PM David Arthur  wrote:
> > > >
> > > > +1 binding
> > > >
> > > > This will be very nice to have. Thanks for the KIP, Colin.
> > > >
> > > > -David
> > > >
> > > > On Tue, Jan 14, 2020 at 11:39 AM Sönke Liebau
> > > >  wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Thanks for creating this!
> > > > >
> > > > > On Tue, 14 Jan 2020 at 17:36, Mitchell  wrote:
> > > > >
> > > > > > +1 (non-binding)!
> > > > > > Very useful kip.
> > > > > > -mitch
> > > > > >
> > > > > > On Tue, Jan 14, 2020 at 10:26 AM Manikumar 
> > > > > > 
> > > > > > wrote:
> > > > > > >
> > > > > > > +1 (binding).
> > > > > > >
> > > > > > > Thanks for the KIP.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Sun, Jan 12, 2020 at 1:23 AM Lucas Bradstreet 
> > > > > > > 
> > > > > > wrote:
> > > > > > >
> > > > > > > > +1 (non binding)
> > > > > > > >
> > > > > > > > On Sat, 11 Jan 2020 at 02:32, M. Manna  
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Colin,
> > > > > > > > >
> > > > > > > > > +1 - Really useful for folks managing a cluster by themselves.
> > > > > > > > >
> > > > > > > > > M. MAnna
> > > > > > > > >
> > > > > > > > > On Fri, 10 Jan 2020 at 22:35, Jose Garcia Sancio <
> > > > > > jsan...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > +1, LGTM.
> > > > > > > > > >
> > > > > > > > > > On Fri, Jan 10, 2020 at 2:19 PM Gwen Shapira 
> > > > > > > > > > 
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > +1, thanks for driving this
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Jan 10, 2020 at 2:17 PM Colin McCabe <
> > > > > cmcc...@apache.org
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi all,
> > > > > > > > > > > >
> > > > > > > > > > > > I'd like to start the vote on KIP-551: Expose disk read 
> > > > > > > > > > > > and
> > > > > > write
> > > > > > > > > > > metrics.
> > > > > > > > > > > >
> > > > > > > > > > > > KIP:  https://cwiki.apache.org/confluence/x/sotSC
> > > > > > > > > > > >
> > > > > > > > > > > > Discussion thread:
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > > https://lists.apache.org/thread.html/cfaac4426455406abe890464a7f4ae23a5c69a39afde66fe6eb3d696%40%3Cdev.kafka.apache.org%3E
> > > > > > > > > > > >
> > > > > > > > > > > > cheers,
> > > > > > > > > > > > Colin
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -Jose
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Sönke Liebau
> > > > > Partner
> > > > > Tel. +49 179 7940878
> > > > > OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
> > > > >
> > > >
> > > >
> > > > --
> > > > David Arthur
> > >
> >


Re: [DISCUSS] KIP-578: Add configuration to limit number of partitions

2020-04-09 Thread Alexandre Dupriez
Hi Gokul,

Thanks for the KIP.

>From what I understand, the objective of the new configuration is to
protect a cluster from an overload driven by an excessive number of
partitions independently from the load handled on the partitions
themselves. As such, the approach uncouples the data-path load from
the number of unit of distributions of throughput and intends to avoid
the degradation of performance exhibited in the test results provided
with the KIP by setting an upper-bound on that number.

Couple of comments:

900. Multi-tenancy - one concern I would have with a cluster and
broker-level configuration is that it is possible for a user to
consume a large proportions of the allocatable partitions within the
configured limit, leaving other users with not enough partitions to
satisfy their requirements.

901. Quotas - an approach in Apache Kafka to set-up an upper-bound on
resource consumptions is via client/user quotas. Could this framework
be leveraged to add this limit?

902. Partition assignment - one potential problem with the new
repartitioning scheme is that if a subset of brokers have reached
their number of assignable partitions, yet their data path is
under-loaded, new topics and/or partitions will be assigned
exclusively to other brokers, which could increase the likelihood of
data-path load imbalance. Fundamentally, the isolation of the
constraint on the number of partitions from the data-path throughput
can have conflicting requirements.

903. Rebalancing - as a corollary to 902, external tools used to
balance ingress throughput may adopt an incremental approach in
partition re-assignment to redistribute load, and could hit the limit
on the number of partitions on a broker when a (too) conservative
limit is used, thereby over-constraining the objective function and
reducing the migration path.

Thanks,
Alexandre

Le jeu. 9 avr. 2020 à 00:19, Gokul Ramanan Subramanian
 a écrit :
>
> Hi. Requesting you to take a look at this KIP and provide feedback.
>
> Thanks. Regards.
>
> On Wed, Apr 1, 2020 at 4:28 PM Gokul Ramanan Subramanian <
> gokul24...@gmail.com> wrote:
>
> > Hi.
> >
> > I have opened KIP-578, intended to provide a mechanism to limit the number
> > of partitions in a Kafka cluster. Kindly provide feedback on the KIP which
> > you can find at
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-578%3A+Add+configuration+to+limit+number+of+partitions
> >
> > I want to specially thank Stanislav Kozlovski who helped in formulating
> > some aspects of the KIP.
> >
> > Many thanks,
> >
> > Gokul.
> >


Re: [VOTE] KIP-579: new exception on min.insync.replicas > replication.factor

2020-03-31 Thread Alexandre Dupriez
+1 (non binding).

Thanks.

Le mar. 31 mars 2020 à 14:24, M. Manna  a écrit :
>
> +1 (binding).
>
> Thanks for the KIP.
>
> On Tue, 31 Mar 2020 at 14:17, Paolo Moriello 
> wrote:
>
> > Hello,
> >
> > Thanks to everybody who has given feedback. I've incorporated the
> > suggestions and think that this is now ready for a vote.
> >
> > KIP 579:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-579%3A+new+exception+on+min.insync.replicas+%3E+replication.factor
> >
> > PR:
> > https://github.com/apache/kafka/pull/8225
> >
> > Thanks,
> > Paolo
> >


TLS failures and map I/O faults

2020-03-27 Thread Alexandre Dupriez
Dear community,

I recently faced an unexpected type of failures in the middle of an
incident related to the exhaustion of memory-map handles on a Kafka
broker.

The use case is as follows - a broker, not overloaded, manages enough
indexes to reach the limit on mmap count per process. This leads to
file memory-mapping failures at broker start-up.
It was eventually mitigated by increasing the said limit or reducing
the number of files to mmap.

But before I could mitigate the problem, I was trying to restart the
broker and faced the same failure every time - except once, where map
I/O failures disappeared and instead, every TLS connection attempt
started to fail, with the following exception:

INFO Failed to create channel due to
(org.apache.kafka.common.network.SslChannelBuilder)
java.lang.IllegalArgumentException: Cannot support
TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384 with currently installed
providers
at sun.security.ssl.CipherSuiteList.(CipherSuiteList.java:81)
at 
sun.security.ssl.SSLEngineImpl.setEnabledCipherSuites(SSLEngineImpl.java:2027)
at 
org.apache.kafka.common.security.ssl.SslFactory.createSslEngine(SslFactory.java:278)
...
at java.lang.Thread.run(Thread.java:748)

However, there were absolutely no change on the certificates,
truststore and keystore files on the host, and neither were the
application binaries changed nor the JRE used to run Kafka. And at the
subsequent restart, this particular type of failure disappeared, and
the map I/O failures resumed.

I cannot understand the origin of these failures, and figure out if it
can find its foundations in (map or regular) I/O faults as the
surrounding failures.

Has anyone encountered this scenario in the past?
How strong would you estimate the correlation between map I/O failures
and that one?

Many thanks,

Alexandre


[jira] [Resolved] (KAFKA-7958) Transactions are broken with kubernetes hosted brokers

2020-03-20 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-7958.
--
Fix Version/s: 2.1.1
   Resolution: Fixed

> Transactions are broken with kubernetes hosted brokers
> --
>
> Key: KAFKA-7958
> URL: https://issues.apache.org/jira/browse/KAFKA-7958
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
> Environment: cp-kakfka 2.1.1-1, kafka-streams 2.1.1
>Reporter: Thomas Dickinson
>Priority: Major
> Fix For: 2.1.1
>
>
> After a rolling re-start in a kubernetes-like environment, brokers may change 
> IP address.  From our logs it seems that the transaction manager in the 
> brokers never re-resolves the DNS name of other brokers, keeping stale pod 
> IPs.  Thus transactions stop working.  
> ??[2019-02-20 02:20:20,085] WARN [TransactionCoordinator id=1001] Connection 
> to node 0 
> (khaki-joey-kafka-0.khaki-joey-kafka-headless.hyperspace-dev/[10.233.124.181:9092|http://10.233.124.181:9092/])
>  could not be established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)??
> ??[2019-02-20 02:20:57,205] WARN [TransactionCoordinator id=1001] Connection 
> to node 1 
> (khaki-joey-kafka-1.khaki-joey-kafka-headless.hyperspace-dev/[10.233.122.67:9092|http://10.233.122.67:9092/])
>  could not be established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)??
> This is from the log from broker 1001 which was restarted first, followed by 
> 1 and then 0.  The log entries are from the day after the rolling restart.
> I note a similar issue was fixed for clients 2.1.1  
> https://issues.apache.org/jira/browse/KAFKA-7755.  We are using streams lib 
> 2.1.1
> *Update* We are now testing with Kafka 2.1.1 (docker cp-kafka 5.1.2-1) and it 
> looks like this may be resolved.  Would be good to get confirmation though.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Question about log flusher real frequency

2020-03-09 Thread Alexandre Dupriez
Hi Fares,

On Linux kernels, you can use the property "dirty_writeback_centisecs"
[1] to configure the period between executions of kswapd, which does
this "sync" job. The period is usually set to 30 seconds.
There are few exceptions where Kafka explicitly forces a sync (via the
force() method from the I/O API of the JDK), e.g. when a segment is
rolled or Kafka shutting down.

The page writeback activity from your kernel is monitorable at
different levels of granularity and depending on the instrumentation
you are willing to use.

Why would you want to monitor this activity in the first place? Do you
want to know exactly *when* your data is on the disk?

[1] https://www.kernel.org/doc/Documentation/sysctl/vm.txt

Le lun. 9 mars 2020 à 15:58, Fares Oueslati  a écrit :
>
> Hello,
>
> By default, both log.flush.interval.ms and log.flush.interval.messages are
> set to Long.MAX_VALUE.
>
> As I understand, it makes Kafka flush log to disk (fsync) only depends on
> file system.
>
> Is there any simple way to monitor that frequency ?
>
> Is there a rule of thumb to estimate that value depending on the os ?
>
> Thank you guys !
> Fares


Re: [DISCUSS] KAFKA-4680: min.insync.replica can be set > replication factor

2020-03-03 Thread Alexandre Dupriez
Thanks Paolo for taking care of this.

I think option (3) would be the closest to iso-functionality. I wonder
how useful it could be to always add the replication factor of the
topic in the error message [1] versus returning a modified one fitted
for this specific case?

[1] 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/cluster/Partition.scala#L971-L975

Le jeu. 27 févr. 2020 à 19:06, Paolo Moriello
 a écrit :
>
> Hello,
>
> I'd like to take up this Jira ticket
> . This is an old ticket,
> marked as a Kafka bug.
>
> Before moving forward, I'd like to open a discussion on what would be the
> best approach to take on when doing the validation, as well as discuss
> about possible edge cases we should consider.
>
> Basically, at the moment, it is possible to specify min.insync.replicas >
> replication factor. When this happens, it is not possible to produce on a
> topic when acks=all as client callback returns NOT_ENOUGH_REPLICAS, and the
> broker logs error messages on each request. As suggested in the Jira, the
> validation should happen much earlier in the process, eg. at topic
> creation/configuration setup.
>
> Regarding the approach to use on validating the configuration; do we want,
> for instance, to:
> 1. print a WARN about the mismatch in the configuration
> 2. make the request FAIL
> 3. or print a more specific message on produce
>
> Options 1 and 2 anticipate the validation on topic creation / configuration
> change. These require to validate the configuration in more than one place:
> at topic creation, at configuration setup/update (both for
> min.insync.replicas and the default.replication.factor), at partition
> reassignment (when reducing replication factor). Don't know about
> consequences
> Option 3 is simpler; it does not anticipate the validation, but at least
> improves the visibility over the issue on the client side.
>
> I'd be in favor of a softer approach, which might include both printing a
> warning on topic creation/configuration-update and eventually a more
> specific message when producing on the topic. On the other end, this does
> not solve the problem, as we would allow anyway the mismatch in the
> configuration. Option 2 would solve the problem with an harder validation
> (eg blocking topic creation or configuration setup), but this requires to
> validate any possible edge case (eg. how do we prevent a change in
> min.insync.replicas if we have already created topic with lower replication
> factor?).
>
> Let me know what's your opinion on this, and if there is any other scenario
> we should consider for the validation (for instance, what's the impact on
> internal topics?).
>
> Thanks,
> Paolo


Re: [DISCUSS] KIP-498: Add client-side configuration for maximum response size to protect against OOM

2020-02-18 Thread Alexandre Dupriez
Hello,

Thanks David to propose a new PR [1] for KIP-498 [2] to address KAFKA-4090 [3].

I find it interesting, I wonder how it stands w.r.t. avoiding
inter-layer violation.

[1] https://github.com/apache/kafka/pull/8066
[2] https://issues.apache.org/jira/browse/KAFKA-4090
[3] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-498%3A+Add+client-side+configuration+for+maximum+response+size+to+protect+against+OOM

Le mar. 6 août 2019 à 15:37, Gokul Ramanan Subramanian
 a écrit :
>
> Hi Alexandre.
>
> Thanks for this analysis.
>
> IMHO, there are 4 ways to ago about this:
>
> 1. We don't fix the bug directly but instead update the Kafka documentation
> telling clients to configure themselves correctly - Silly but easy to
> achieve.
> 2. We adopt Stanislav's solution that fixes the problem - Easy to achieve,
> potentially adds inflexibility in the future if we want to change handshake
> protocol. However, changing the handshake protocol is going to be a
> backwards incompatible change anyway with or without Stanislav's solution.
> 3. We adopt Alexandre's solution - Easy to achieve, but has shortcomings
> Alexandre has highlighted.
> 4. We pivot KIP-498 to focus on standardizing the handshake protocol - Not
> easy to achieve, since this will be a backwards incompatible change and
> will require client and server redeployments in correct order. Further,
> this can be a hard problem to solve given that various transport layer
> protocols have different headers. In order for the "new handshake" protocol
> to work, it would have to work in the same namespace as those headers,
> which will require careful tuning of handshake constants.
>
> Any thoughts from the community on how we can proceed?
>
> Thanks.
>
> On Tue, Aug 6, 2019 at 12:41 PM Alexandre Dupriez <
> alexandre.dupr...@gmail.com> wrote:
>
> > Hello,
> >
> > I wrote a small change [1] to make clients validate the size of messages
> > received from a broker at the protocol-level.
> > However I don't like the change. It does not really solve the problem which
> > originally motivated the KIP - that is, protocol mismatch (plaintext to SSL
> > endpoint). More specifically, few problems I can see are listed below. This
> > is a non-exhaustive list. They also have been added to KIP-498 [2].
> >
> > 1) Incorrect failure mode
> > As was experimented and as can be seen as part of the integration tests,
> > when an invalid size is detected and the exception InvalidReceiveException
> > is thrown, consumers and producers keeps retrying until the poll timeout
> > expires or the maximum number of retries is reached. This is incorrect if
> > we consider the use case of protocol mismatch which motivated this change.
> > Indeed, producers and consumers would need to fail fast as retries will
> > only prolong the time to remediation from users/administrators.
> >
> > 2) Incomplete remediation
> > The proposed change cannot provide an definite guarantee against OOM in all
> > scenarios - for instance, it will still manifest if the maximum size is set
> > to 100 MB and the JVM is under memory pressure and have less than 100 MB of
> > allocatable memory.
> >
> > 3) Illegitimate message rejection
> > Even worse: what if the property is incorrectly configured and prevent
> > legitimate messages from reaching the client?
> >
> > 4) Unclear configuration parameter
> > 4.a) The name max.response.size intends to mirror the existing
> > max.request.size from the producer's configuration properties. However,
> > max.request.size intends to check the size of producer records as provided
> > by a client; while max.response.size is to check the size directly decoded
> > from the network according to Kafka's binary protocol.
> > 4.b) On the broker, the property socket.request.max.bytes is used to
> > validate the size of messages received by the server. The new property
> > serves the same purpose, which introduces duplicated semantic, even if one
> > property is characterised with the keyword "request" and the other with
> > "response", in both cases reflecting the perspective adopted from either a
> > client or a server.
> >
> > Please let me know what you think. An alternative mitigation may be worth
> > investigated for the detection of protocol mismatch in the client.
> >
> > [1] https://github.com/apache/kafka/pull/7160
> > [2]
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-498%3A+Add+client-side+configuration+for+maximum+response+size+to+protect+against+OOM
> >
> > Le jeu. 1 août 2019 à 09:42, Alexandre Dupriez <
> > alexandre.dupr

[jira] [Created] (KAFKA-9565) Implementation of Tiered Storage SPI to integrate with S3

2020-02-18 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-9565:


 Summary: Implementation of Tiered Storage SPI to integrate with S3
 Key: KAFKA-9565
 URL: https://issues.apache.org/jira/browse/KAFKA-9565
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alexandre Dupriez






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9564) Integration Tests for Tiered Storage

2020-02-17 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-9564:


 Summary: Integration Tests for Tiered Storage
 Key: KAFKA-9564
 URL: https://issues.apache.org/jira/browse/KAFKA-9564
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alexandre Dupriez
Assignee: Alexandre Dupriez






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Possible to create Scrum board under Kafka project in JIRA?

2020-02-14 Thread Alexandre Dupriez
Good morning,

Would it be possible to allow the the Apache Kafka project in JIRA to
be included in a new Scrum board?

I can see there is already a Kanban board for Cloudera and tried to
create a Scrum board for Tiered-Storage but don't have the permissions
to include Apache Kafka.

Thank you,
Alexandre


[jira] [Created] (KAFKA-9555) Topic-based implementation for the RLMM

2020-02-14 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-9555:


 Summary: Topic-based implementation for the RLMM
 Key: KAFKA-9555
 URL: https://issues.apache.org/jira/browse/KAFKA-9555
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Alexandre Dupriez






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9554) Define the SPI for Tiered Storage framework

2020-02-14 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-9554:


 Summary: Define the SPI for Tiered Storage framework
 Key: KAFKA-9554
 URL: https://issues.apache.org/jira/browse/KAFKA-9554
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, core
Reporter: Alexandre Dupriez
Assignee: Alexandre Dupriez


The goal of this task is to define the SPI (service provider interfaces) which 
will be used by vendors to implement plug-ins to communicate with specific 
storage system.



Done means:
 * Package with interfaces and key objects available and published for review.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-12-19 Thread Alexandre Dupriez
Hi Jun,

Thank you for the feedback. I am trying to understand how a push-based
approach would work.
In order for the metadata to be propagated (under the assumption you
stated), would you plan to add a new API in Kafka to allow the
metadata store to send them directly to the brokers?

Thanks,
Alexandre


Le mer. 18 déc. 2019 à 20:14, Jun Rao  a écrit :
>
> Hi, Satish and Ivan,
>
> There are different ways for doing push based metadata propagation. Some
> object stores may support that already. For example, S3 supports events
> notification (
> https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html).
> Otherwise one could use a separate metadata store that supports push-based
> change propagation. Other people have mentioned using a Kafka topic. The
> best approach may depend on the object store and the operational
> environment (e.g. whether an external metadata store is already available).
>
> The above discussion is based on the assumption that we need to cache the
> object metadata locally in every broker. I mentioned earlier that an
> alternative is to just store/retrieve those metadata in an external
> metadata store. That may simplify the implementation in some cases.
>
> Thanks,
>
> Jun
>
> On Thu, Dec 5, 2019 at 7:01 AM Satish Duggana 
> wrote:
>
> > Hi Jun,
> > Thanks for your reply.
> >
> > Currently, `listRemoteSegments` is called at the configured
> > interval(not every second, defaults to 30secs). Storing remote log
> > metadata in a strongly consistent store for S3 RSM is raised in
> > PR-comment[1].
> > RLM invokes RSM at regular intervals and RSM can give remote segment
> > metadata if it is available. RSM is responsible for maintaining and
> > fetching those entries. It should be based on whatever mechanism is
> > consistent and efficient with the respective remote storage.
> >
> > Can you give more details about push based mechanism from RSM?
> >
> > 1. https://github.com/apache/kafka/pull/7561#discussion_r344576223
> >
> > Thanks,
> > Satish.
> >
> > On Thu, Dec 5, 2019 at 4:23 AM Jun Rao  wrote:
> > >
> > > Hi, Harsha,
> > >
> > > Thanks for the reply.
> > >
> > > 40/41. I am curious which block storages you have tested. S3 seems to be
> > > one of the popular block stores. The concerns that I have with pull based
> > > approach are the following.
> > > (a) Cost: S3 list object requests cost $0.005 per 1000 requests. If you
> > > have 100,000 partitions and want to pull the metadata for each partition
> > at
> > > the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per day.
> > > (b) Semantics: S3 list objects are eventually consistent. So, when you
> > do a
> > > list object request, there is no guarantee that you can see all uploaded
> > > objects. This could impact the correctness of subsequent logics.
> > > (c) Efficiency: Blindly pulling metadata when there is no change adds
> > > unnecessary overhead in the broker as well as in the block store.
> > >
> > > So, have you guys tested S3? If so, could you share your experience in
> > > terms of cost, semantics and efficiency?
> > >
> > > Jun
> > >
> > >
> > > On Tue, Dec 3, 2019 at 10:11 PM Harsha Chintalapani 
> > wrote:
> > >
> > > > Hi Jun,
> > > >   Thanks for the reply.
> > > >
> > > >
> > > >
> > > > On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao  wrote:
> > > >
> > > > > Hi, Satish and Ying,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > 40/41. There are two different ways that we can approach this. One is
> > > > what
> > > > > you said. We can have an opinionated way of storing and populating
> > the
> > > > tier
> > > > > metadata that we think is good enough for everyone. I am not sure if
> > this
> > > > > is the case based on what's currently proposed in the KIP. For
> > example, I
> > > > > am not sure that (1) everyone always needs local metadata; (2) the
> > > > current
> > > > > local storage format is general enough and (3) everyone wants to use
> > the
> > > > > pull based approach to propagate the metadata. Another approach is to
> > > > make
> > > > > this pluggable and let the implementor implements the best approach
> > for a
> > > > > particular block storage. I haven't seen any comments from
> > Slack/AirBnb
> > > > in
> > > > > the mailing list on this topic. It would be great if they can provide
> > > > > feedback directly here.
> > > > >
> > > >
> > > > The current interfaces are designed with most popular block storages
> > > > available today  and we did 2 implementations with these interfaces and
> > > > they both are yielding good results as we going through the testing of
> > it.
> > > > If there is ever a need for pull based approach  we can definitely
> > evolve
> > > > the interface.
> > > > In the past we did mark interfaces to be evolving to make room for
> > unknowns
> > > > in the future.
> > > > If you have any suggestions around the current interfaces please
> > propose we
> > > > are happy to see if we can work them into it.
> > > >
> > > >
> > > > 

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-12-18 Thread Alexandre Dupriez
Hi all,

A.1 As commented on PR 7561, S3 consistency model [1][2] implies RSM cannot
relies solely on S3 APIs to guarantee the expected strong consistency. The
proposed implementation [3] would need to be updated to take this into
account. Let’s talk more about this.

A.2 Contract for the RSM API (an API call is loosely defined as an
“operation” here):

A.2.1 The KIP mentions “*If the process of a topic-partition is failed due
to remote storage error, its scheduled processing time is set to ( now() +
rlm_retry_interval_ms ). rlm_retry_interval_ms can be configured in broker
config file.*”. Do you still plan to implement such retries?

A.2.2 Idempotency – Are operations exposed by the RSM idempotent? What is
the risk of an operation being retried with the same input (irrespective of
the state of the remote storage)? If the same successful operation is
retried with the same input, should an error be propagated? How would this
error be discriminated from I/O or other type of failures?

A.2.3 Atomicity – what does an implementation of RSM need to provide with
respect to atomicity of the APIs copyLogSegment, cleanupLogUntil and
deleteTopicPartition? If a partial failure happens in any of those (e.g. in
the S3 implementation, if one of the multiple uploads fails [4]), what
guarantees are to be provided to the RLM on the state of the remote
storage, and what if it is left in an unconsistent state? In case the
operation is meant to be retried from RLM, this means RSM is expected to
recover from partial failures? What if an unrecoverable failure affects the
RSM? In the RLMTask an exception is logged [5] but it seems the task
continues to be scheduled -> is there a mode where a topic partition stops
to be transferred to the remote storage?

A.2.4 Consistency – already discussed.

A.2.5 Failure modes – currently RSM propagates failures as IOExceptions.
Wouldn’t we need a slightly different contract for the RSM? As opposed to
the I/O errors which Kafka handles in its innermost layers when accessing
the file system, should the implementations of RSM deal with low-level
errors and retries (wherever it can), and not expose them via its API?
Since the RLM is agnostic of the implementation behind the RSM, it is
virtually impossible to know how to deal with an I/O type of exception
without prior assumptions on the implementation of RSM exercised.

A.3 Caching – storing locally the segments retrieved from the remote
storage is excluded as it does not align with the original intent and even
defeat some of its purposes (save disk space etc.). That said, could there
be other types of use cases where the pattern of access to the remotely
stored segments would benefit from local caching (and potentially
read-ahead)? Consider the use case of a large pool of consumers which start
a backfill at the same time for one day worth of data from one year ago
stored remotely. Caching the segments locally would allow to uncouple the
load on the remote storage from the load on the Kafka cluster. Maybe the
RLM could expose a configuration parameter to switch that feature on/off?

[1]
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel
[2]
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Warning_.231:_S3_Consistency_model
[3] https://github.com/harshach/kafka/pull/18
[4]
https://github.com/harshach/kafka/pull/18/files#diff-39e2143514ed06d5d066708309263424R124
[5]
https://github.com/apache/kafka/pull/7561/files#diff-a597bd0c7d627789e73d1fa38eb1abfaR278

Le jeu. 5 déc. 2019 à 15:01, Satish Duggana  a
écrit :

> Hi Jun,
> Thanks for your reply.
>
> Currently, `listRemoteSegments` is called at the configured
> interval(not every second, defaults to 30secs). Storing remote log
> metadata in a strongly consistent store for S3 RSM is raised in
> PR-comment[1].
> RLM invokes RSM at regular intervals and RSM can give remote segment
> metadata if it is available. RSM is responsible for maintaining and
> fetching those entries. It should be based on whatever mechanism is
> consistent and efficient with the respective remote storage.
>
> Can you give more details about push based mechanism from RSM?
>
> 1. https://github.com/apache/kafka/pull/7561#discussion_r344576223
>
> Thanks,
> Satish.
>
> On Thu, Dec 5, 2019 at 4:23 AM Jun Rao  wrote:
> >
> > Hi, Harsha,
> >
> > Thanks for the reply.
> >
> > 40/41. I am curious which block storages you have tested. S3 seems to be
> > one of the popular block stores. The concerns that I have with pull based
> > approach are the following.
> > (a) Cost: S3 list object requests cost $0.005 per 1000 requests. If you
> > have 100,000 partitions and want to pull the metadata for each partition
> at
> > the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per day.
> > (b) Semantics: S3 list objects are eventually consistent. So, when you
> do a
> > list object request, there is no guarantee that you can see all uploaded
> > objects. This could impact the 

Re: [VOTE] KIP-544: Make metrics exposed via JMX configurable

2019-11-09 Thread Alexandre Dupriez
+1 (non-binding)

Le ven. 8 nov. 2019 à 20:21, Bill Bejeck  a écrit :

> Thanks for the KIP, +1 (binding).
>
> -Bill
>
> On Fri, Nov 8, 2019 at 1:28 PM Gwen Shapira  wrote:
>
> > +1 (binding)
> >
> > On Thu, Nov 7, 2019 at 11:17 AM Xavier Léauté 
> wrote:
> > >
> > > Hi everyone,
> > >
> > > I'd like to open up the vote for KIP-544 on making exposed JMX metrics
> > more
> > > configurable.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable
> > >
> > > Thank you!
> > > Xavier
> >
>


Re: [DISCUSS] KIP-544: Make metrics exposed via JMX configurable

2019-11-08 Thread Alexandre Dupriez
Hello,

This can be very handy when dealing with large numbers of partitions on a
broker.

I was recently experimenting with a third-party monitoring framework which
provides a JMX collector [1] with the same mechanism to filter out the JMX
beans retrieved from Kafka.
When running a couple of tests with all filters removed, the time to fetch
all beans could become quickly prohibitive as the number of partitions on
the tested broker increased.

After some investigation, the main source of "friction" was found in the
(too) many RMI RPCs required to fetch the names and attributes of the JMX
beans.
Configuring the same JMX collector to run as a JVM agent, and taking care
of unplugging the JMX-RMI connector, yielded significant gains (*).

Note that this was obtained by fetching the beans via HTTP, with all values
sent in a batch.
I find one of the potential follow-up mentioned (exposing the beans via an
alternative API) also very interesting from a performance perspective.

[1] https://github.com/prometheus/jmx_exporter
(*) On a 4-cores Xeon 8175M broker, hosting 1,000 replicas, the time to
fetch all beans dropped from 13 seconds to ~400 ms.

Le ven. 8 nov. 2019 à 17:29, Guozhang Wang  a écrit :

> Sounds good, thanks.
>
> Guozhang
>
> On Fri, Nov 8, 2019 at 9:26 AM Xavier Léauté  wrote:
>
> > >
> > > 1. I do feel there're similar needs for clients make JMX configurable.
> > Some
> > > context: in modules like Connect and Streams we have added /
> refactored a
> > > large number of metrics so far [0, 1], and although we've added a
> > reporting
> > > level config [2] to clients, this is statically defined at code and
> > cannot
> > > be dynamically changed either.
> > >
> >
> > Thanks for providing some context there, I have updated the KIP to add
> > equivalent configs for clients, streams, and connect
> >
> >
> > > 2. This may be out of the scope of this KIP, but have you thought about
> > how
> > > to make the metrics collection to be configurable (i.e. basically for
> > those
> > > metrics which we know would not be exposed, we do not collect them
> > either)
> > > dynamically?
> >
> >
> > Yes, given what you described above, it would make sense to look into
> this.
> > One difficulty though, is that we'd probably want to define this at the
> > sensor level,
> > which does not always map to the metric names users understand.
> >
> > There are also cases where someone may want to expose different sets of
> > metrics
> > using different reporters, so I think a reporting level config is still
> > useful.
> > For this KIP, I am proposing we stick to making reporting configurable,
> > independent of the underlying collection mechanism.
> >
>
>
> --
> -- Guozhang
>


Re: UnderReplicatedPartitions = 0 and UnderMinPartitionIsrCount > 0

2019-08-15 Thread Alexandre Dupriez
Hello James,

Many thanks for your quick response and pointing out the associated JIRA.

Happy to dig a bit on the function paths affected by a consistency check on
these parameters, as explained in the ticket, and see what could be done
(or not).

Thanks,
Alexandre

Le mer. 14 août 2019 à 06:11, James Cheng  a écrit :

> Alexandre,
>
> You are right that this is a problem. There is a JIRA on this from a while
> back.
>
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-4680
>
> I don’t think anyone is currently working on it right now.
>
> -James
>
> Sent from my iPhone
>
> > On Aug 13, 2019, at 1:17 AM, Alexandre Dupriez <
> alexandre.dupr...@gmail.com> wrote:
> >
> > Hello all,
> >
> > We run into a scenario where we had misconfigured the replication factor
> > and the minimum in-sync replicas count in such a way that the replication
> > factor (either default or defined at the topic level) is strictly lower
> > than the property min.insync.replicas.
> >
> > We observed broker metrics reporting UnderReplicatedPartitions = 0 and
> > UnderMinPartitionIsrCount > 0, and the topic’s partitions were
> unavailable
> > for producers (with ack=all) and consumers.
> >
> > Since it seems to be impossible in this scenario to ever reach the number
> > of in-sync replicas, making partitions permanently unavailable, it could
> be
> > worth to prevent this misconfiguration to make its way to the broker,
> e.g.
> > a check could be added when a topic is created to ensure the replication
> > factor is greater than or equals to the minimum number of in-sync
> replicas.
> >
> > I may have missed something though. What do you think?
> >
> > Thank you,
> > Alexandre
>


[jira] [Resolved] (KAFKA-8695) Metrics UnderReplicated and UnderMinIsr are diverging when configuration is inconsistent

2019-08-14 Thread Alexandre Dupriez (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-8695.
--
Resolution: Duplicate

De-duplicating in favor of 
[KAFKA-4680|http://issues.apache.org/jira/browse/KAFKA-4680].

> Metrics UnderReplicated and UnderMinIsr are diverging when configuration is 
> inconsistent
> 
>
> Key: KAFKA-8695
> URL: https://issues.apache.org/jira/browse/KAFKA-8695
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0, 2.2.0, 2.1.1, 2.3.0
>    Reporter: Alexandre Dupriez
>    Assignee: Alexandre Dupriez
>Priority: Minor
>
> As of now, Kafka allows the replication factor of a topic and 
> "min.insync.replicas" to be set such that "min.insync.replicas" > the topic's 
> replication factor.
> As a consequence, the JMX beans
> {code:java}
> kafka.cluster:type=Partition,name=UnderReplicated{code}
> and 
> {code:java}
> kafka.cluster:type=Partition,name=UnderMinIsr{code}
> can report diverging views on the replication for a topic. The former can 
> report no under replicated partition, while the second will report under 
> in-sync replicas.
> Even worse, consumption of topics which exhibit this behaviour seems to fail, 
> the Kafka broker throwing a NotEnoughReplicasException. 
> {code:java}
> [2019-07-22 10:44:29,913] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition __consumer_offsets-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition __consumer_offsets-0 {code}
> In order to avoid this scenario, one possibility would be to check the values 
> of "min.insync.replicas" and "default.replication.factor" when the broker 
> starts, and "min.insync.replicas" and the replication factor given to a topic 
> at creation time, and refuses to create the topic if those are inconsistently 
> set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


UnderReplicatedPartitions = 0 and UnderMinPartitionIsrCount > 0

2019-08-13 Thread Alexandre Dupriez
Hello all,

We run into a scenario where we had misconfigured the replication factor
and the minimum in-sync replicas count in such a way that the replication
factor (either default or defined at the topic level) is strictly lower
than the property min.insync.replicas.

We observed broker metrics reporting UnderReplicatedPartitions = 0 and
UnderMinPartitionIsrCount > 0, and the topic’s partitions were unavailable
for producers (with ack=all) and consumers.

Since it seems to be impossible in this scenario to ever reach the number
of in-sync replicas, making partitions permanently unavailable, it could be
worth to prevent this misconfiguration to make its way to the broker, e.g.
a check could be added when a topic is created to ensure the replication
factor is greater than or equals to the minimum number of in-sync replicas.

I may have missed something though. What do you think?

Thank you,
Alexandre


Re: [DISCUSS] KIP-498: Add client-side configuration for maximum response size to protect against OOM

2019-08-06 Thread Alexandre Dupriez
Hello,

I wrote a small change [1] to make clients validate the size of messages
received from a broker at the protocol-level.
However I don't like the change. It does not really solve the problem which
originally motivated the KIP - that is, protocol mismatch (plaintext to SSL
endpoint). More specifically, few problems I can see are listed below. This
is a non-exhaustive list. They also have been added to KIP-498 [2].

1) Incorrect failure mode
As was experimented and as can be seen as part of the integration tests,
when an invalid size is detected and the exception InvalidReceiveException
is thrown, consumers and producers keeps retrying until the poll timeout
expires or the maximum number of retries is reached. This is incorrect if
we consider the use case of protocol mismatch which motivated this change.
Indeed, producers and consumers would need to fail fast as retries will
only prolong the time to remediation from users/administrators.

2) Incomplete remediation
The proposed change cannot provide an definite guarantee against OOM in all
scenarios - for instance, it will still manifest if the maximum size is set
to 100 MB and the JVM is under memory pressure and have less than 100 MB of
allocatable memory.

3) Illegitimate message rejection
Even worse: what if the property is incorrectly configured and prevent
legitimate messages from reaching the client?

4) Unclear configuration parameter
4.a) The name max.response.size intends to mirror the existing
max.request.size from the producer's configuration properties. However,
max.request.size intends to check the size of producer records as provided
by a client; while max.response.size is to check the size directly decoded
from the network according to Kafka's binary protocol.
4.b) On the broker, the property socket.request.max.bytes is used to
validate the size of messages received by the server. The new property
serves the same purpose, which introduces duplicated semantic, even if one
property is characterised with the keyword "request" and the other with
"response", in both cases reflecting the perspective adopted from either a
client or a server.

Please let me know what you think. An alternative mitigation may be worth
investigated for the detection of protocol mismatch in the client.

[1] https://github.com/apache/kafka/pull/7160
[2]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-498%3A+Add+client-side+configuration+for+maximum+response+size+to+protect+against+OOM

Le jeu. 1 août 2019 à 09:42, Alexandre Dupriez 
a écrit :

> Thanks Gokul and Stanislav for your answers.
>
> I went through the PR 5940 [1]. Indeed Gokul, your reasoning echoes the
> observations of Ismael about a potential inter-protocol layering violation.
>
> As you said Stanislav, the server-side SSL engine responds with an alert
> with code 80 (internal_error) from what I saw when reproducing the OOM.
> Since the Alert is generated below the application layer, I am not sure
> what could be done on the broker to handle the scenario more gracefully.
> Interestingly, the SSL engine emits the possibility of receiving a
> plaintext message in debug logs [2].
>
> The idea was indeed to perform a simple check on the message size decoded
> in NetworkReceive against a configurable value, and throw
> an InvalidReceiveException in a similar fashion as what happens on the
> broker, where a non-unlimited maxSize can be provided. Basically, mimic the
> behaviour on the broker. The default value for the maximal request size on
> the broker is 100 MB which you are suggesting to use client-side.
>
> Adding a client configuration property for clients may be an overkill
> here. What I am going to ask is naive but - is it theoretically possible
> for the broker to legitimately send responses over 100 MB in size?
>
> Thanks,
> Alexandre
>
> [1] https://github.com/apache/kafka/pull/5940
> [2]
>
> kafka-network-thread-0-ListenerName(SSL)-SSL-4, fatal error: 80: problem
> unwrapping net record
>
> javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
>
> kafka-network-thread-0-ListenerName(SSL)-SSL-4, SEND TLSv1.2 ALERT:  fatal,
> description = internal_error
>
> kafka-network-thread-0-ListenerName(SSL)-SSL-4, WRITE: TLSv1.2 Alert,
> length = 2
>
> kafka-network-thread-0-ListenerName(SSL)-SSL-4, called closeOutbound()
>
> kafka-network-thread-0-ListenerName(SSL)-SSL-4, closeOutboundInternal()
>
> kafka-network-thread-0-ListenerName(SSL)-SSL-4, called closeInbound()
>
> kafka-network-thread-0-ListenerName(SSL)-SSL-4, fatal: engine already
> closed.  Rethrowing javax.net.ssl.SSLException: Inbound closed before
> receiving peer's close_notify: possible truncation attack?
>
> [Raw write]: length = 7
>
> : 15 03 03 00 02 02 50   ..P
>
>
> Le jeu. 1 août 201

[jira] [Created] (KAFKA-8752) Ensure plugin classes are instantiable when discovering plugins

2019-08-05 Thread Alexandre Dupriez (JIRA)
Alexandre Dupriez created KAFKA-8752:


 Summary: Ensure plugin classes are instantiable when discovering 
plugins
 Key: KAFKA-8752
 URL: https://issues.apache.org/jira/browse/KAFKA-8752
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Alexandre Dupriez
Assignee: Alexandre Dupriez


While running integration tests from the IntelliJ IDE, it appears plugins fail 
to load in {{DelegatingClassLoader.scanUrlsAndAddPlugins}}. The reason was, in 
this case, that the class 
{{org.apache.kafka.connect.connector.ConnectorReconfigurationTest$TestConnector}}
 could not be instantiated - which it does not intend to be.

The problem does not when running integration tests with Gradle as the runtime 
closure is different from IntelliJ - which includes test sources from modules 
depended on on the classpath.

While debugging this minor inconvenience, I could see that 
{{DelegatingClassLoader}} performs a sanity check on the plugin class to 
instantiate - as of now, it verifies the class is concrete. A quick fix for the 
problem highlighted above could to add an extra condition on the Java modifiers 
of the class to ensure it will be instantiable.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-498: Add client-side configuration for maximum response size to protect against OOM

2019-08-01 Thread Alexandre Dupriez
Thanks Gokul and Stanislav for your answers.

I went through the PR 5940 [1]. Indeed Gokul, your reasoning echoes the
observations of Ismael about a potential inter-protocol layering violation.

As you said Stanislav, the server-side SSL engine responds with an alert
with code 80 (internal_error) from what I saw when reproducing the OOM.
Since the Alert is generated below the application layer, I am not sure
what could be done on the broker to handle the scenario more gracefully.
Interestingly, the SSL engine emits the possibility of receiving a
plaintext message in debug logs [2].

The idea was indeed to perform a simple check on the message size decoded
in NetworkReceive against a configurable value, and throw
an InvalidReceiveException in a similar fashion as what happens on the
broker, where a non-unlimited maxSize can be provided. Basically, mimic the
behaviour on the broker. The default value for the maximal request size on
the broker is 100 MB which you are suggesting to use client-side.

Adding a client configuration property for clients may be an overkill here.
What I am going to ask is naive but - is it theoretically possible for the
broker to legitimately send responses over 100 MB in size?

Thanks,
Alexandre

[1] https://github.com/apache/kafka/pull/5940
[2]

kafka-network-thread-0-ListenerName(SSL)-SSL-4, fatal error: 80: problem
unwrapping net record

javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?

kafka-network-thread-0-ListenerName(SSL)-SSL-4, SEND TLSv1.2 ALERT:  fatal,
description = internal_error

kafka-network-thread-0-ListenerName(SSL)-SSL-4, WRITE: TLSv1.2 Alert,
length = 2

kafka-network-thread-0-ListenerName(SSL)-SSL-4, called closeOutbound()

kafka-network-thread-0-ListenerName(SSL)-SSL-4, closeOutboundInternal()

kafka-network-thread-0-ListenerName(SSL)-SSL-4, called closeInbound()

kafka-network-thread-0-ListenerName(SSL)-SSL-4, fatal: engine already
closed.  Rethrowing javax.net.ssl.SSLException: Inbound closed before
receiving peer's close_notify: possible truncation attack?

[Raw write]: length = 7

: 15 03 03 00 02 02 50   ..P


Le jeu. 1 août 2019 à 08:50, Stanislav Kozlovski  a
écrit :

> Hey Alexandre, thanks for the KIP!
>
> I had personally hit the same problem and found it very annoying.
> Have you considered detecting such a scenario in the client and simply
> throwing an exception, rather than allocating any memory?
> I have an open PR that does just that -
> https://github.com/apache/kafka/pull/5940
> <https://github.com/apache/kafka/pull/5940/files>
>
> Best,
> Stanislav
>
> On Wed, Jul 31, 2019 at 10:35 AM Gokul Ramanan Subramanian <
> gokul24...@gmail.com> wrote:
>
> > Hi Alex.
> >
> > A rejected alternative is to try to do SSL handshake from the plaintext
> > transport layer. This couples various transport layers and is inflexible
> to
> > adding new transport layers in the future. Further, if the plaintext
> > transport layer does SSL handshake, it will always get an error,
> > irrespective of whether or not the peer supports SSL. Therefore, the
> > plaintext transport layer would have to determine if the peer uses SSL or
> > not based on the type of error it gets back from the SSL handshake. This
> > fits right into the general anti-pattern of using exceptions as control
> > flow mechanisms.
> >
> > Another rejected alternative would be to have a single byte at the
> > transport layer represent the security protocol in use. This byte would
> > have to be present in every single message between clients and brokers
> and
> > between brokers and brokers. This change is backwards-incompatible and
> adds
> > overhead to Kafka. It is likely never going to get accepted by the
> > community.
> >
> > In the absence of a fool-proof way to do a handhskake across all security
> > protocols (plaintext, SSL, other future ones), I think that the proposed
> > KIP provides a good solution. Therefore, you have my +1. (Not sure if my
> +1
> > counts, since I am a Kafka noob).
> >
> > Thanks.
> > Gokul Ramanan Subramanian
> > Senior SDE, Amazon AWS
> >
> > On 28/Jul/19 05:43:19PM +0100, Alexandre Dupriez wrote:
> >  Hello,
> >
> >  I have created the KIP-498
> >  <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-498%3A+Add+client-side+configuration+for+maximum+response+size+to+protect+against+OOM
> > >
> >  to
> >  propose a minor addition to the producer configuration properties, in
> > order
> >  to protect against OOM when the response message decoded by a client
> >  indicates an "abnormally high" size to be allocated.
> >
> >  This follows this discussion thread
&

[DISCUSS] KIP-498: Add client-side configuration for maximum response size to protect against OOM

2019-07-28 Thread Alexandre Dupriez
Hello,

I have created the KIP-498

to
propose a minor addition to the producer configuration properties, in order
to protect against OOM when the response message decoded by a client
indicates an "abnormally high" size to be allocated.

This follows this discussion thread
 and is
tracked by KAFKA-4090 .

Please let me know what you think. I created this KIP even though the
change seems minimal because it affects client configuration, which is
mentioned as a type of change eligible for a KIP on this main wiki page

.

Thanks,
Alexandre


Re: JIRA and KIP contributor permissions

2019-07-23 Thread Alexandre Dupriez
Hello Matthias,

Thanks for the quick reply, I can confirm I am able to auto-assign JIRA
tickets.

Please find here my Confluence username: alexandre.dupriez

Many thanks,
Alexandre

Le mar. 23 juil. 2019 à 04:38, Matthias J. Sax  a
écrit :

> Hi Alexandre,
>
> I added you to the list of contributors in JIRA, so you can self-assign
> ticket. However, I did not find any corresponding wiki. Note, that both
> are independent accounts and you might need to create a wiki account
> first (and share you ID so we can grant write permission).
>
>
> -Matthias
>
> On 7/22/19 1:16 PM, Alexandre Dupriez wrote:
> > Hello Community,
> >
> > In order to start contributing to Apache Kafka project, could I please
> > request contributor access to JIRA and be granted write permissions to
> the
> > Kafka wiki?
> >
> > JIRA username: adupriez
> > Committer email: alexandre.dupr...@amazon.com 
> >
> > Thank you in advance,
> > Alexandre
> >
>
>


JIRA and KIP contributor permissions

2019-07-22 Thread Alexandre Dupriez
Hello Community,

In order to start contributing to Apache Kafka project, could I please
request contributor access to JIRA and be granted write permissions to the
Kafka wiki?

JIRA username: adupriez
Committer email: alexandre.dupr...@amazon.com 

Thank you in advance,
Alexandre


[jira] [Created] (KAFKA-8695) Metrics UnderReplicated and UnderMinSir are diverging when configuration is inconsistent

2019-07-22 Thread Alexandre Dupriez (JIRA)
Alexandre Dupriez created KAFKA-8695:


 Summary: Metrics UnderReplicated and UnderMinSir are diverging 
when configuration is inconsistent
 Key: KAFKA-8695
 URL: https://issues.apache.org/jira/browse/KAFKA-8695
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.3.0, 2.1.1, 2.2.0, 2.1.0
Reporter: Alexandre Dupriez


As of now, Kafka allows the replication factor of a topic and 
"min.insync.replicas" to be set such that "min.insync.replicas" > the topic's 
replication factor.

As a consequences, the JMX beans
{code:java}
kafka.cluster:type=Partition,name=UnderReplicated{code}
and 
{code:java}
kafka.cluster:type=Partition,name=UnderMinIsr{code}
can report diverging views on the replication for a topic. The former can 
report no under replicated partition, while the second will report under 
in-sync replicas.

 

Even worse, consumption of topics which exhibit this behaviour seems to fail, 
the Kafka broker throwing a NotEnoughReplicasException.

 

 
{code:java}
[2019-07-22 10:44:29,913] ERROR [ReplicaManager broker=0] Error processing 
append operation on partition __consumer_offsets-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for 
partition __consumer_offsets-0{code}
 

 

In order to avoid this scenario, one possibility would be to check the values 
of "min.insync.replicas" and "default.replication.factor" when the broker 
starts, and "min.insync.replicas" and the replication factor given to a topic 
at creation time, and refuses to create the topic if those are inconsistently 
set.

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Metrics UnderReplicated and UnderMinSir are diverging when configuration is inconsistent

2019-07-22 Thread Alexandre Dupriez
Hello Community,

I noticed Kafka allows the replication factor of a topic and
"min.insync.replicas" to be set such that "min.insync.replicas" > the
topic's replication factor.

As a consequences, the JMX beans
kafka.cluster:type=Partition,name=UnderReplicated and
kafka.cluster:type=Partition,name=UnderMinIsr
can report diverging views on the replication for a topic. The former can
report no under replicated partition, while the second will report under
in-sync replicas.

Even worse, consumption of topics which exhibit this behaviour seems to
fail, the Kafka broker throwing a NotEnoughReplicasException.

[2019-07-22 10:44:29,913] ERROR [ReplicaManager broker=0] Error processing
> append operation on partition __consumer_offsets-0
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2
> for partition __consumer_offsets-0


In order to avoid this scenario, one possibility would be to check the
values of "min.insync.replicas" and "default.replication.factor" when the
broker starts, and "min.insync.replicas" and the replication factor given
to a topic at creation time, and refuses to create the topic if those are
inconsistently set.

This was reproduced with Kafka 2.1.0, 2.2.0 and 2.3.0.

What do you think?

Alexandre