Re: Kafka Tiered Storage Disablement Implement in ZK Mode Question

2024-11-02 Thread Kamal Chandraprakash
For shortcoming (2), looking at the code, as long as the partition is not
offline during disablement (leader is available),
then the remote-log deletion might proceed as expected. Note that the
feature is not tested out in ZK mode, and the
implementation details can change later. So, I'd suggest you upgrade to
KRaft.

On Sat, Nov 2, 2024 at 4:15 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Hi Jianfeng,
>
> As Luke already mentioned, the feature was not tested in ZK mode.
> If you still want to enable it in the ZK mode, then:
>
> 1. Disable the LogConfig#validateNoInvalidRemoteStorageConfigsInZK
> <https://sourcegraph.com/github.com/apache/kafka@3.9/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java?L675-682>
> validation.
> 2. Update the relevant unit tests, otherwise build will fail.
> 3. Enable the integration test DisableRemoteLogOnTopicTest
> <https://sourcegraph.com/github.com/apache/kafka@3.9/-/blob/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java?L44-48>
>  for
> both Zk and Kraft modes.
>
> *Shortcomings with ZK mode*
> 1. The disabling remote storage validation is done by the controller only
> in the Kraft mode. Zk mode doesn't support it.
> Refer to #16693 <https://github.com/apache/kafka/pull/16693> PR and
> RemoteTopicCrudTest#testUpdateTopicConfigWithDisablingRemoteStorage
> <https://sourcegraph.com/github.com/apache/kafka@3.9/-/blob/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala?L428-445>
>  test.
> 2. There can be few edge-cases which may not be covered/tested: (eg)
> replica down during disablement,
> replica was down during disablement followed by enablement, and so on.
>
> Read the discussion and voting thread for more details.
>
> Thanks,
> Kamal
>
> On Wed, Oct 30, 2024 at 12:56 PM Luke Chen  wrote:
>
>> Hi Jianfeng,
>>
>> The reason we don't support ZK mode is because the ZK is going to be
>> removed in v4.0.0, so we decided to only implement it in KRaft mode.
>>
>> > I read the source code and seems kafka broker only updates the LogConfig
>> and then invokes maybeUpdateRemoteLogComponents to
>> cancel some tasks. So I want to know how if just update the log configs
>> without a v5 stopReplicas v5 rpc from controller to broker in zk mode?
>>
>> The ZK mode is not completely implemented IIRC, and not being tested. So
>> I'd suggest you upgrade to KRaft for it.
>>
>> Thank you.
>> Luke
>>
>> On Mon, Oct 28, 2024 at 12:32 PM Jianfeng Lin
>>  wrote:
>>
>> >   Hi kafka community, I read the KIP-950
>> > <
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement#KIP950:TieredStorageDisablement-Disablement-ZookeeperBackedCluster
>> > >
>> > which
>> > describe the disablement of kafka tiered storage. It said that zk mode
>> > won't be supported and I can't find out the reason, with confusion, I
>> have
>> > a question:
>> >   This KIP proposes an implement with stopReplicas v5 to let brokers
>> know
>> > which topic partitions should stop their remote storage functions when
>> > "remote.log.copy.disable" is set to true, different from kraft mode, it
>> > takes an extra rpc round. I read the source code and seems kafka broker
>> > only updates the LogConfig and then invokes
>> maybeUpdateRemoteLogComponents
>> > to
>> > cancel some tasks. So I want to know how if just update the log configs
>> > without a v5 stopReplicas v5 rpc from controller to broker in zk mode?
>> And
>> > why zk mode is not supported to disable?
>> >   Really appreciate if Luke Chen could help to
>> answer,
>> > thanks in advance!
>> >
>> >
>> > Jianfeng Lin from Shopee
>> >
>>
>


Re: Kafka Tiered Storage Disablement Implement in ZK Mode Question

2024-11-02 Thread Kamal Chandraprakash
Hi Jianfeng,

As Luke already mentioned, the feature was not tested in ZK mode.
If you still want to enable it in the ZK mode, then:

1. Disable the LogConfig#validateNoInvalidRemoteStorageConfigsInZK

validation.
2. Update the relevant unit tests, otherwise build will fail.
3. Enable the integration test DisableRemoteLogOnTopicTest

for
both Zk and Kraft modes.

*Shortcomings with ZK mode*
1. The disabling remote storage validation is done by the controller only
in the Kraft mode. Zk mode doesn't support it.
Refer to #16693  PR and
RemoteTopicCrudTest#testUpdateTopicConfigWithDisablingRemoteStorage

 test.
2. There can be few edge-cases which may not be covered/tested: (eg)
replica down during disablement,
replica was down during disablement followed by enablement, and so on.

Read the discussion and voting thread for more details.

Thanks,
Kamal

On Wed, Oct 30, 2024 at 12:56 PM Luke Chen  wrote:

> Hi Jianfeng,
>
> The reason we don't support ZK mode is because the ZK is going to be
> removed in v4.0.0, so we decided to only implement it in KRaft mode.
>
> > I read the source code and seems kafka broker only updates the LogConfig
> and then invokes maybeUpdateRemoteLogComponents to
> cancel some tasks. So I want to know how if just update the log configs
> without a v5 stopReplicas v5 rpc from controller to broker in zk mode?
>
> The ZK mode is not completely implemented IIRC, and not being tested. So
> I'd suggest you upgrade to KRaft for it.
>
> Thank you.
> Luke
>
> On Mon, Oct 28, 2024 at 12:32 PM Jianfeng Lin
>  wrote:
>
> >   Hi kafka community, I read the KIP-950
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement#KIP950:TieredStorageDisablement-Disablement-ZookeeperBackedCluster
> > >
> > which
> > describe the disablement of kafka tiered storage. It said that zk mode
> > won't be supported and I can't find out the reason, with confusion, I
> have
> > a question:
> >   This KIP proposes an implement with stopReplicas v5 to let brokers know
> > which topic partitions should stop their remote storage functions when
> > "remote.log.copy.disable" is set to true, different from kraft mode, it
> > takes an extra rpc round. I read the source code and seems kafka broker
> > only updates the LogConfig and then invokes
> maybeUpdateRemoteLogComponents
> > to
> > cancel some tasks. So I want to know how if just update the log configs
> > without a v5 stopReplicas v5 rpc from controller to broker in zk mode?
> And
> > why zk mode is not supported to disable?
> >   Really appreciate if Luke Chen could help to
> answer,
> > thanks in advance!
> >
> >
> > Jianfeng Lin from Shopee
> >
>


Re: Network partition leaves topic-partition leader as sole ISR despite min.isr=2 and producer acks=all settings

2024-08-12 Thread Kamal Chandraprakash
Hi Sabit,

Thanks for reporting the issue! This is the last standing replica problem
and is being fixed in KIP-966.
You can go through the below blog to understand it in detail:

https://jack-vanlightly.com/blog/2023/8/17/kafka-kip-966-fixing-the-last-replica-standing-issue#:~:text=Rule%20number%20one%20of%20leader,behind%20they%20also%20get%20removed
.

On Mon, Aug 12, 2024 at 2:03 AM Sabit Nepal  wrote:

> Hello,
>
> We experienced a network partition in our kafka cluster which left 1 broker
> unable to be reached by other brokers, but it could still reach our
> zookeeper cluster. When this occurred, a number of topic-partitions shrunk
> their ISR to just the impaired broker itself, halting progress on those
> partitions. As we had to take the broker instance offline and provision a
> replacement, the partitions were unavailable until the replacement instance
> came back up and resumed acting as the broker.
>
> However, reviewing our broker and producer settings, I'm not sure why it's
> possible for the leader to have accepted some writes that were not able to
> be replicated to the followers. Our topics use min.insync.replicas=2 and
> our producers use acks=all configuration. In this scenario, with the
> changes not being replicated to other followers, I'd expect the records to
> have failed to be written. We are however on an older version of kafka -
> 2.6.1 - so I'm curious if maybe future versions have improved the behavior
> here?
>
> Some relevant logs:
>
> [Partition my-topic-one-120 broker=7] Shrinking ISR from 7,8,9 to 7.
> Leader: (highWatermark: 82153383556, endOffset: 82153383565). Out of sync
> replicas: (brokerId: 8, endOffset: 82153383556) (brokerId: 9, endOffset:
> 82153383561).
> [Partition my-topic-one-120 broker=7] ISR updated to [7] and zkVersion
> updated to [1367]
> [ReplicaFetcher replicaId=9, leaderId=7, fetcherId=5] Error in response for
> fetch request (type=FetchRequest, replicaId=9, maxWait=500, minBytes=1,
> maxBytes=10485760, fetchData={my-topic-one-120=(fetchOffset=75987953095,
> logStartOffset=75983970457, maxBytes=1048576,
> currentLeaderEpoch=Optional[772]),
> my-topic-one-84=(fetchOffset=87734453342,
> logStartOffset=87730882175, maxBytes=1048576,
> currentLeaderEpoch=Optional[776]),
> my-topic-one-108=(fetchOffset=72037212609,
> logStartOffset=72034727231, maxBytes=1048576,
> currentLeaderEpoch=Optional[776]),
> my-topic-one-72=(fetchOffset=83006080094,
> logStartOffset=83002240584, maxBytes=1048576,
> currentLeaderEpoch=Optional[768]),
> my-topic-one-96=(fetchOffset=79250375295,
> logStartOffset=79246320254, maxBytes=1048576,
> currentLeaderEpoch=Optional[763])}, isolationLevel=READ_UNCOMMITTED,
> toForget=, metadata=(sessionId=965270777, epoch=725379656), rackId=)
> [Controller id=13 epoch=611] Controller 13 epoch 611 failed to change state
> for partition my-topic-one-120 from OnlinePartition to OnlinePartition
> kafka.common.StateChangeFailedException: Failed to elect leader for
> partition my-topic-one-120 under strategy
> ControlledShutdownPartitionLeaderElectionStrategy
> (later)
> kafka.common.StateChangeFailedException: Failed to elect leader for
> partition my-topic-one-120 under strategy
> OfflinePartitionLeaderElectionStrategy(false)
>
> Configuration for this topic:
>
> Topic: my-topic-one PartitionCount: 250 ReplicationFactor: 3 Configs:
> min.insync.replicas=2,segment.bytes=536870912,retention.ms
> =180,unclean.leader.election.enable=false
>
> Outside of this topic, we also had a topic with a replication factor of 5
> impacted, and also the __consumer_offsets topic which we set to an RF of 5.
>
> [Partition my-topic-two-204 broker=7] Shrinking ISR from 10,9,7,11,8 to 7.
> Leader: (highWatermark: 86218167, endOffset: 86218170). Out of sync
> replicas: (brokerId: 10, endOffset: 86218167) (brokerId: 9, endOffset:
> 86218167) (brokerId: 11, endOffset: 86218167) (brokerId: 8, endOffset:
> 86218167).
> Configuration:
> Topic: my-topic-two PartitionCount: 500 ReplicationFactor: 5 Configs:
> min.insync.replicas=2,segment.jitter.ms
> =360,cleanup.policy=compact,segment.bytes=1048576,
> max.compaction.lag.ms
> =900,min.compaction.lag.ms
> =450,unclean.leader.election.enable=false,
> delete.retention.ms=8640,segment.ms=2160
>
> [Partition __consumer_offsets-18 broker=7] Shrinking ISR from 10,9,7,11,8
> to 7. Leader: (highWatermark: 4387657484, endOffset: 4387657485). Out of
> sync replicas: (brokerId: 9, endOffset: 4387657484) (brokerId: 8,
> endOffset: 4387657484) (brokerId: 10, endOffset: 4387657484) (brokerId: 11,
> endOffset: 4387657484).
> Configuration:
> Topic: __consumer_offsets PartitionCount: 50 ReplicationFactor: 5 Configs:
>
> compression.type=producer,min.insync.replicas=2,cleanup.policy=compact,segment.bytes=104857600,unclean.leader.election.enable=false
>
> Other configurations:
> zookeeper.connection.timeout.ms=6000
> replica.lag.time.max.ms=8000
> zookeeper.session.timeout.ms=6000
> Producer request.timeout.ms=8500
> Produce

Re: How do we usually handle Node disconnected issue for kafka producer

2024-05-04 Thread Kamal Chandraprakash
Hi Sachin,

Why do you want to change the default settings? If the connection is open
and unused,
then it is fair to close the connection after the timeout and reopen it
when required.

On Fri, May 3, 2024 at 1:06 PM Sachin Mittal  wrote:

> Hi,
> I am using a Kafka producer java client by vert.x framework.
>
> https://vertx.io/docs/apidocs/io/vertx/kafka/client/producer/KafkaProducer.html
>
> There is a producer setting in kafka:
> connections.max.idle.ms = 54
>
> So if there are no records to produce then after 9 minutes I get this in my
> logs:
> [kafka-producer-network-thread | RecordProducer] [NetworkClient.java:977] -
> [Producer clientId=RecordProducer] Node -1 disconnected.
>
> What it looks like is the Kafka producer object I had created has lost its
> connection due to this setting.
>
> What are my options to ensure that Kafka producer client does not close
> idle connections or reconnects or keeps alive even when no records to
> produce arrive for a long time?
>
> Thanks
> Sachin
>


Re: Kafka followers with higher leader epoch than leader

2024-03-31 Thread Kamal Chandraprakash
Hi,

The follower is not able to sync-up with the leader due to epochs diverged
between leader and follower.
To confirm this, you can enable request logger and check the
diverging-epoch field in the fetch-response:

https://sourcegraph.com/github.com/apache/kafka@a640a81040f6ef6f85819b60194f0394f5f2194e/-/blob/clients/src/main/resources/common/message/FetchResponse.json?L76

This issue can happen when the leader-epoch-checkpoint file is corrupted in
the leader node. To mitigate the issue, you have to:

1. Stop the leader broker
2. Remove the `leader-epoch-checkpoint` file for that affected partition
3. Recover the partition by deleting the partition entry from the
checkpoint files: `log-start-offset-checkpoint`,
`replication-offset-checkpoint`, `recovery-point-offset-checkpoint`, and
`cleaner-offset-checkpoint`. Note that when removing the entry, you also
have to update the number of entries in those files in Line 2.
4. Remove the `.kafka_cleanshutdown` marker file.
5. Start the node and trigger preferred leader election to elect back the
same node as leader
6. Then, the follower will be able to sync up with the leader.

--
Kamal

On Tue, Mar 19, 2024 at 6:06 PM Karl Sorensen 
wrote:

> Hi,
>
> I have an unusual situation where I have a cluster running Kafka 3.5.1 in
> strimzi where 4 of the __consumer_offset partitions have dropped under min
> isr.
>
> Everything else appears to be working fine.
> Upon investigating, i've found that the partition followers appear to be
> out of sync with the leader in terms of leader epoch
>
> For example the leader-epoch-checkpoint file on the leader partition is
> 0
> 4
> 0 0
> 1 4
> 4 6
> 27 10
>
> while the followers are
> 0
> 5
> 0 0
> 1 4
> 4 6
> 5 7
> 6 9
>
> which appears to me like the followers are 2 elections ahead of the leader
> and i'm not sure how they got to this situation.
> I've attempted to force a new leader election via kafka-leader-elections
> but it refused for both PREFERRED and UNCLEAN.
> I've also tried a manual partition assignment to move the leader to another
> broker but it wont do it.
>
> What is even more strange is that if i watch the leader-epoch-checkpoint
> file on one of the followers I can see it constantly changing as it tries
> to sort itself out.
> [kafka@internal-001-kafka-0 __consumer_offsets-18]$ cat
> leader-epoch-checkpoint
> 0
> 3
> 0 0
> 1 4
> 4 6
> [kafka@internal-001-kafka-0 __consumer_offsets-18]$ cat
> leader-epoch-checkpoint
> 0
> 5
> 0 0
> 1 4
> 4 6
> 5 7
> 6 9
>
> I have tried to manually remove the followers partition files on disk in an
> attempt to get it to sync from the leader but it keeps returning to the
> inconsistent state.
>
> Restarting the broker with the partition leader on it doesn't seem to move
> leadership either.
>
> The follower keeps logging the following constantly
> 2024-03-19 09:23:11,169 INFO [ReplicaFetcher replicaId=2, leaderId=1,
> fetcherId=0] Truncating partition __consumer_offsets-18 with
> TruncationState(offset=7, completed=true) due to leader epoch and offset
> EpochEndOffset(errorCode=0, partition=18, leaderEpoch=4, endOffset=10)
> (kafka.server.ReplicaFetcherThread) [ReplicaFetcherThread-0-1]
> 2024-03-19 09:23:11,169 INFO [UnifiedLog partition=__consumer_offsets-18,
> dir=/var/lib/kafka/data-0/kafka-log2] Truncating to offset 7
> (kafka.log.UnifiedLog) [ReplicaFetcherThread-0-1]
> 2024-03-19 09:23:11,174 INFO [UnifiedLog partition=__consumer_offsets-18,
> dir=/var/lib/kafka/data-0/kafka-log2] Loading producer state till offset 7
> with message format version 2 (kafka.log.UnifiedLog$)
> [ReplicaFetcherThread-0-1]
> 2024-03-19 09:23:11,174 INFO [UnifiedLog partition=__consumer_offsets-18,
> dir=/var/lib/kafka/data-0/kafka-log2] Reloading from producer snapshot and
> rebuilding producer state from offset 7 (kafka.log.UnifiedLog$)
> [ReplicaFetcherThread-0-1]
> 2024-03-19 09:23:11,174 INFO [ProducerStateManager
> partition=__consumer_offsets-18]Loading producer state from snapshot file
> 'SnapshotFile(offset=7,
>
> file=/var/lib/kafka/data-0/kafka-log2/__consumer_offsets-18/0007.snapshot)'
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [ReplicaFetcherThread-0-1]
> 2024-03-19 09:23:11,175 INFO [UnifiedLog partition=__consumer_offsets-18,
> dir=/var/lib/kafka/data-0/kafka-log2] Producer state recovery took 1ms for
> snapshot load and 0ms for segment recovery from offset 7
> (kafka.log.UnifiedLog$) [ReplicaFetcherThread-0-1]
> 2024-03-19 09:23:11,175 WARN [UnifiedLog partition=__consumer_offsets-18,
> dir=/var/lib/kafka/data-0/kafka-log2] Non-monotonic update of high
> watermark from (offset=10segment=[0:4083]) to (offset=7segment=[0:3607])
> (kafka.log.UnifiedLog) [ReplicaFetcherThread-0-1]
>
> Any ideas of how to look at this further?
> Thanks
> Karl
>
> --
>
>
>
> --
>
> The information contained in this electronic message and any
> attachments to this message are intended for the exclusive use of the
> addressee(s) and may contain proprietary, confide

Re: Script to delete list of topics

2024-01-23 Thread Kamal Chandraprakash
for topic in `cat /tmp/topics.txt`; do echo $topic; sh kafka-topics.sh
--bootstrap-servers localhost:9092 --topic $topic --delete; done
# you can also delete the topics by wildcard sh kafka-topics.sh
--bootstrap-servers localhost:9092 --topic abc.* --delete

On Tue, Jan 23, 2024 at 11:50 AM sunil chaudhari <
sunilmchaudhar...@gmail.com> wrote:

> Hi,
> anybody there who has a shell script to delete topics from cluster.
> I have list of elegible topics to be deleted.
> Script should accept list of files as an input.
> Delete topics one by one.
> please share.
> Thanks in advance.
>
> regards,
> Sunil.
>


Re: [VOTE] 3.6.0 RC2

2023-10-02 Thread Kamal Chandraprakash
+1 (non-binding)

1. Built the source from 3.6 branch in scala 2.12 and 2.13
2. Ran all the unit and integration tests.
3. Ran quickstart and verified the produce-consume on a 3 node cluster.
4. Verified the tiered storage functionality with local-tiered storage.

Thanks,
Kamal

On Mon, Oct 2, 2023 at 6:07 PM Divij Vaidya  wrote:

> + 1 (non-binding)
>
> Verifications:
> 1. I ran a produce-consume workload with plaintext auth, JDK17, zstd
> compression using an open messaging benchmark and found 3.6 to be better
> than or equal to 3.5.1 across all dimensions. Notably, 3.6 had consistently
> 6-7% lower CPU utilization, lesser spikes on P99 produce latencies and
> overall lower P99.8 latencies.
>
> 2. I have verified that detached signature is correct using
> https://www.apache.org/info/verification.html and the release manager
> public keys are available at
> https://keys.openpgp.org/search?q=F65DC3423D4CD7B9
>
> 3. I have verified that all metrics emitted in 3.5.1 (with Zk) are also
> being emitted in 3.6.0 (with Zk).
>
> Problems (but not blockers):
> 1. Metrics added in
>
> https://github.com/apache/kafka/commit/2f71708955b293658cec3b27e9a5588d39c38d7e
> aren't available in the documentation (cc: Justine). I don't consider this
> as a release blocker but we should add it as a fast follow-up.
>
> 2. Metric added in
>
> https://github.com/apache/kafka/commit/a900794ace4dcf1f9dadee27fbd8b63979532a18
> isn't available in documentation (cc: David). I don't consider this as a
> release blocker but we should add it as a fast follow-up.
>
> --
> Divij Vaidya
>
>
>
> On Mon, Oct 2, 2023 at 9:50 AM Federico Valeri 
> wrote:
>
> > Hi Satish, I did the following to verify the release:
> >
> > - Built from source with Java 17 and Scala 2.13
> > - Ran all unit and integration tests
> > - Spot checked documentation
> > - Ran custom client applications using staging artifacts on a 3-nodes
> > cluster
> > - Tested tiered storage with one of the available RSM implementations
> >
> > +1 (non binding)
> >
> > Thanks
> > Fede
> >
> > On Mon, Oct 2, 2023 at 8:50 AM Luke Chen  wrote:
> > >
> > > Hi Satish,
> > >
> > > I verified with:
> > > 1. Ran quick start in KRaft for scala 2.12 artifact
> > > 2. Making sure the checksum are correct
> > > 3. Browsing release notes, documents, javadocs, protocols.
> > > 4. Verified the tiered storage feature works well.
> > >
> > > +1 (binding).
> > >
> > > Thanks.
> > > Luke
> > >
> > >
> > >
> > > On Mon, Oct 2, 2023 at 5:23 AM Jakub Scholz  wrote:
> > >
> > > > +1 (non-binding). I used the Scala 2.13 binaries and the staged Maven
> > > > artifacts and run my tests. Everything seems to work fine for me.
> > > >
> > > > Thanks
> > > > Jakub
> > > >
> > > > On Fri, Sep 29, 2023 at 8:17 PM Satish Duggana <
> > satish.dugg...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Kafka users, developers and client-developers,
> > > > >
> > > > > This is the third candidate for the release of Apache Kafka 3.6.0.
> > > > > Some of the major features include:
> > > > >
> > > > > * KIP-405 : Kafka Tiered Storage
> > > > > * KIP-868 : KRaft Metadata Transactions
> > > > > * KIP-875: First-class offsets support in Kafka Connect
> > > > > * KIP-898: Modernize Connect plugin discovery
> > > > > * KIP-938: Add more metrics for measuring KRaft performance
> > > > > * KIP-902: Upgrade Zookeeper to 3.8.1
> > > > > * KIP-917: Additional custom metadata for remote log segment
> > > > >
> > > > > Release notes for the 3.6.0 release:
> > > > >
> https://home.apache.org/~satishd/kafka-3.6.0-rc2/RELEASE_NOTES.html
> > > > >
> > > > > *** Please download, test and vote by Tuesday, October 3, 12pm PT
> > > > >
> > > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > > > https://kafka.apache.org/KEYS
> > > > >
> > > > > * Release artifacts to be voted upon (source and binary):
> > > > > https://home.apache.org/~satishd/kafka-3.6.0-rc2/
> > > > >
> > > > > * Maven artifacts to be voted upon:
> > > > >
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > > >
> > > > > * Javadoc:
> > > > > https://home.apache.org/~satishd/kafka-3.6.0-rc2/javadoc/
> > > > >
> > > > > * Tag to be voted upon (off 3.6 branch) is the 3.6.0-rc2 tag:
> > > > > https://github.com/apache/kafka/releases/tag/3.6.0-rc2
> > > > >
> > > > > * Documentation:
> > > > > https://kafka.apache.org/36/documentation.html
> > > > >
> > > > > * Protocol:
> > > > > https://kafka.apache.org/36/protocol.html
> > > > >
> > > > > * Successful Jenkins builds for the 3.6 branch:
> > > > > There are a few runs of unit/integration tests. You can see the
> > latest
> > > > > at https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.6/. We
> > will
> > > > > continue running a few more iterations.
> > > > > System tests:
> > > > > We will send an update once we have the results.
> > > > >
> > > > > Thanks,
> > > > > Satish.
> > > > >
> > > >
> >
>


Re: Release plan required for version 3.5.1

2023-07-25 Thread Kamal Chandraprakash
Hi Sahil,

Apache Kafka 3.5.1 is already released: https://kafka.apache.org/downloads

On Wed, Jul 26, 2023 at 9:08 AM Sahil Sharma D
 wrote:

> Gentle reminder-2
>
> -Original Message-
> From: Sahil Sharma D
> Sent: 12 July 2023 09:51 AM
> To: users@kafka.apache.org
> Subject: RE: Release plan required for version 3.5.1
>
> Gentle reminder!
>
> -Original Message-
> From: Sahil Sharma D
> Sent: 03 July 2023 04:39 PM
> To: users@kafka.apache.org
> Subject: RE: Release plan required for version 3.5.1
>
> Hi,
>
> That means below vulnerabilities are not appliable for kafka, right?
> CVE-2022-42003
> CVE-2022-42004
> CVE-2023-34454
> CVE-2023-34453
> CVE-2023-35116
>
> Regards,
> Sahil
>
> -Original Message-
> From: Josep Prat 
> Sent: 03 July 2023 02:02 PM
> To: users@kafka.apache.org
> Subject: Re: Release plan required for version 3.5.1
>
> Hi Sahil,
> Thanks for caring about Apache Kafka's security. One can fix this
> situation by replacing the affected jar file with the one containing the
> fix for the vulnerabilities. We plan to add a write up under Apache Kafka's
> CVE page.
> Mind that Apache Kafka  doesn't typically do emergency releases for CVEs
> discovered in their dependencies unless affectation in Kafka itself is
> major.
>
> That being said, if you take a look at the `dev` mailing list, you'll see
> that a maintainer already volunteered to be the release manager for 3.5.1:
> https://lists.apache.org/thread/q8rxv7wo8mwvzs3d25hzy987xph7f7nr
> If you want to be up-to-date with the release plan of 3.5.1 (contents,
> estimated timings and such) please check the `dev` mailing list as this
> information is usually shared there. The `user` mailing list usually gets
> notified when release candidates or new versions are created.
>
> Best,
>
> On Mon, Jul 3, 2023 at 9:46 AM Sahil Sharma D 
> 
> wrote:
>
> > Gentle reminder!
> >
> > From: Sahil Sharma D
> > Sent: 26 June 2023 08:18 PM
> > To: users@kafka.apache.org
> > Subject: Release plan required for version 3.5.1
> > Importance: High
> >
> > Hi Team,
> >
> > There is an vulnerability on snappy-java-1.1.8.4.jar, are we impacted
> > due to this if we are using only client jar and kafka server.
> >
> > Below are the vulnerabilities that still open and we unable to find
> > any detail of these CVEs on jira. In which version these CVEs are
> > planned to be resolved?
> > CVE-2022-42003
> > CVE-2022-42004
> > CVE-2023-34454
> > CVE-2023-34453
> > CVE-2023-35116
> >
> > Kindly share the release plan for version 3.5.1.
> >
> > Regards,
> > Sahil
> >
>
>
> --
> [image: Aiven] <
> https://protect2.fireeye.com/v1/url?k=31323334-501d5122-313273af-45444731-4fde1f84294d975c&q=1&e=2478bc68-679b-40d9-944b-4cde1de3c2b7&u=https%3A%2F%2Fwww.aiven.io%2F
> >
>
> *Josep Prat*
> Open Source Engineering Director, *Aiven*
> josep.p...@aiven.io   |   +491715557497
> aiven.io <
> https://protect2.fireeye.com/v1/url?k=31323334-501d5122-313273af-45444731-4fde1f84294d975c&q=1&e=2478bc68-679b-40d9-944b-4cde1de3c2b7&u=https%3A%2F%2Fwww.aiven.io%2F>
>  |   <
> https://protect2.fireeye.com/v1/url?k=31323334-501d5122-313273af-45444731-83e1421cb9381159&q=1&e=2478bc68-679b-40d9-944b-4cde1de3c2b7&u=https%3A%2F%2Fwww.facebook.com%2Faivencloud
> >
>      <
> https://twitter.com/aiven_io>
> *Aiven Deutschland GmbH*
> Alexanderufer 3-7, 10117 Berlin
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen Amtsgericht
> Charlottenburg, HRB 209739 B
>
>


Re: Offset commit consistently timing out in transaction

2021-01-08 Thread Kamal Chandraprakash
Timeout in offset commit request has been fixed recently. This issue seems
to be more of KAFKA-8334 
.

On Mon, Jan 4, 2021 at 3:53 PM Kindernay Oliver
 wrote:

> Hello,
>
> we are experiencing problems with offset commit timing out on brokers.
> This started to happen when we started using transactions.
> send_offsets_to_transactions periodically (every few minutes) times out. We
> have cluster of three brokers, and topics with 15 partitions. We use one
> transactional producer per partition and commit the transaction at least
> each second or more. The time out seemingly happens randomly, each time for
> a different producer instance and different broker.
>
>
>   1.  We used to get REQUEST_TIMED_OUT after 5 seconds. I understand that
> error came from a broker.
>   2.  We tried to raise offsets.commit.timeout.ms on broker to 60 seconds
>   3.  After the change, we are getting transaction operations timeout
> after 60s, with same periodicity. This is now a client error since the
> kafka broker takes the full minute, after which we would probably see the
> same error message from broker as previsously.
>  *'cimpl.KafkaException'>/KafkaError{code=_TIMED_OUT,val=-185,str="Transactional
> operation timed out"}
>  *   %5|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Timed out
> TxnOffsetCommitRequest in flight (after 59943ms, timeout #0)
> %4|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Timed out 1
> in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
> %7|1609747761.212|FAIL|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: 1 request(s) timed
> out: disconnect (after 1544886ms in state UP) (_TIMED_OUT)
> %3|1609747761.212|FAIL|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: 1 request(s) timed
> out: disconnect (after 1544886ms in state UP)
> %7|1609747761.212|STATE|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state
> UP -> DOWN
> %7|1609747761.212|STATE|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state
> DOWN -> INIT
> %7|1609747761.212|REQERR|rdkafka#producer-10| [thrd:main]:
> 10.48.111.102:9092/2: MetadataRequest failed: Local: Timed out: actions
> Retry
> %7|1609747761.312|STATE|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state
> INIT -> TRY_CONNECT
> %7|1609747761.312|RETRY|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Moved 1 retry
> buffer(s) to output queue
>
>
>
>   1.  Broker system metrics (disk, network, CPU, memory) did not indicate
> any bottleneck, Anyway, we tried to upgrade the broker VMs to larger size,
> with no change to the behaviour.
>   2.  This happens on our test cluster, and on our prod cluster. It seems
> the frequency by which this happens in less on the test cluster (it has
> lower traffic and lower resources)
>   3.  We use python's confluent_kafka 1.5.0 - based on librdkafka 1.5.0
>   4.  Broker package version is confluent-kafka-2.11-2.0.1
>   5.  I enabled TRACE log level for everything on the test cluster
>
> This is a trace log from the broker. Client logs indicate that the
> timed-out operation started at 08:08:21:
>
> 2021-01-04 08:08:15,413] DEBUG TransactionalId mu-data-generator-7
> complete transition from PrepareCommit to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=6, txnState=CompleteCommit, topicPartitions=Set(),
> txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:08:15,413] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=6, txnState=CompleteCommit, topicPartitions=Set(),
> txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:08:15,413] TRACE [Transaction Marker Channel Manager 2]:
> Completed transaction for mu-data-generator-7 with coordinator epoch 375,
> final state after commit: CompleteCommit
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7
> prepare transition from CompleteCommit to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=6, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: 

Re: Max poll interval and timeouts

2020-03-25 Thread Kamal Chandraprakash
With group coordination protocol, you only have to increase the `
max.poll.interval.ms` / `max.poll.records`.
Ignore the above messages. Consumer heartbeats are processed in a separate
thread.

On Wed, Mar 25, 2020 at 2:35 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Yes, with `assign` you'll lose the group coordination. You can still use
> the `subscribe` mode, update the above mentioned configs.
> You're ask is kind of Delay Queue. Kafka Consumer doesn't support that
> feature. You've to manually `sleep` in between the poll calls.
>
> On Tue, Mar 24, 2020 at 11:56 PM Ryan Schachte 
> wrote:
>
>> Don't I lose consumer group coordination with assign?
>>
>> On Mon, Mar 23, 2020 at 11:49 PM Kamal Chandraprakash <
>> kamal.chandraprak...@gmail.com> wrote:
>>
>> > Hi Ryan,
>> >
>> > The maxPollInterval waits for at-most the given time duration and
>> returns
>> > ASAP even if a single record is available.
>> > If you want to collect data once 30-45 minutes,  better to use the
>> Consumer
>> > with `assign` mode and poll for records
>> > once in 30 minutes.
>> >
>> > If you're using the consumer with `subscribe` mode, then you have to
>> update
>> > the following configs:
>> > 1. session.timeout.ms
>> > 2. heartbeat.interval.ms and
>> > 3. group.max.session.timeout.ms in the broker configs.
>> >
>> > Increasing the session timeout will lead to delay in detecting the
>> consumer
>> > failures, I would suggest to go with `assign` mode.
>> >
>> >
>> > On Tue, Mar 24, 2020 at 4:45 AM Ryan Schachte <
>> coderyanschac...@gmail.com>
>> > wrote:
>> >
>> > > Hey guys, I'm getting a bit overwhelmed by the different variables
>> used
>> > to
>> > > help enable batching for me.
>> > >
>> > > I have some custom batching logic that processes when either N records
>> > have
>> > > been buffered or my max timeout has been hit. It was working decently
>> > well,
>> > > but I hit this error:
>> > >
>> > > *This means that the time between subsequent calls to poll() was
>> longer
>> > > than the configured max.poll.interval.ms <http://max.poll.interval.ms
>> >,
>> > > which typically implies that the poll loop is spending too much time
>> > > message processing.*
>> > >
>> > > I ultimately want to wait for the buffer to fill up or sit and collect
>> > data
>> > > continuously for 30-45 mins at a time. Do I need to do anything with
>> > > heartbeat or session timeout as well?
>> > >
>> > > So now my question is.. Can I just bump my maxPollInterval to
>> something
>> > > like:
>> > >
>> > > maxPollInterval: '270',
>> > >
>> >
>>
>


Re: Max poll interval and timeouts

2020-03-25 Thread Kamal Chandraprakash
Yes, with `assign` you'll lose the group coordination. You can still use
the `subscribe` mode, update the above mentioned configs.
You're ask is kind of Delay Queue. Kafka Consumer doesn't support that
feature. You've to manually `sleep` in between the poll calls.

On Tue, Mar 24, 2020 at 11:56 PM Ryan Schachte 
wrote:

> Don't I lose consumer group coordination with assign?
>
> On Mon, Mar 23, 2020 at 11:49 PM Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Hi Ryan,
> >
> > The maxPollInterval waits for at-most the given time duration and returns
> > ASAP even if a single record is available.
> > If you want to collect data once 30-45 minutes,  better to use the
> Consumer
> > with `assign` mode and poll for records
> > once in 30 minutes.
> >
> > If you're using the consumer with `subscribe` mode, then you have to
> update
> > the following configs:
> > 1. session.timeout.ms
> > 2. heartbeat.interval.ms and
> > 3. group.max.session.timeout.ms in the broker configs.
> >
> > Increasing the session timeout will lead to delay in detecting the
> consumer
> > failures, I would suggest to go with `assign` mode.
> >
> >
> > On Tue, Mar 24, 2020 at 4:45 AM Ryan Schachte <
> coderyanschac...@gmail.com>
> > wrote:
> >
> > > Hey guys, I'm getting a bit overwhelmed by the different variables used
> > to
> > > help enable batching for me.
> > >
> > > I have some custom batching logic that processes when either N records
> > have
> > > been buffered or my max timeout has been hit. It was working decently
> > well,
> > > but I hit this error:
> > >
> > > *This means that the time between subsequent calls to poll() was longer
> > > than the configured max.poll.interval.ms <http://max.poll.interval.ms
> >,
> > > which typically implies that the poll loop is spending too much time
> > > message processing.*
> > >
> > > I ultimately want to wait for the buffer to fill up or sit and collect
> > data
> > > continuously for 30-45 mins at a time. Do I need to do anything with
> > > heartbeat or session timeout as well?
> > >
> > > So now my question is.. Can I just bump my maxPollInterval to something
> > > like:
> > >
> > > maxPollInterval: '270',
> > >
> >
>


Re: MirrorMaker2 - uneven loadbalancing

2020-03-24 Thread Kamal Chandraprakash
Hi Peter,

Not sure this is what you're looking for -
https://issues.apache.org/jira/browse/KAFKA-9352

On Mon, Mar 23, 2020 at 11:37 PM Ryanne Dolan  wrote:

> Thanks Peter for running this experiment. That looks sorta normal. It looks
> like Connect is deciding to use 10 total tasks and doesn't care which ones
> do what. Ideally you'd see the MirrorSourceConnector tasks evenly divided,
> since they do the bulk of the work -- but that doesn't seem to be the case
> with your selection of parameters.
>
> I'd recommend bumping up the tasks.max a lot higher than 4 in order to
> achieve finer-grained workloads and a more even balance.
>
> Ryanne
>
> On Mon, Mar 23, 2020 at 9:58 AM Péter Sinóros-Szabó
>  wrote:
>
> > so I made some tests with tasks.max = 4
> >
> > with 2 instances:
> > - instance 1: 4 MirrorSourceConnector, 1 MirrorHeartbeatConnector tasks
> > - instance 2: 4 MirrorCheckpointConnector, 1 MirrorHeartbeatConnector
> tasks
> >
> > with 3 instances:
> > - instance 1: 3 MirrorCheckpointConnector tasks
> > - instance 2: 3 MirrorSourceConnector tasks, 1 MirrorHeartbeatConnector
> > task
> > - instance 3: 1 MirrorSourceConnector, 1 MirrorCheckpointConnector task,
> > 1 MirrorHeartbeatConnector task
> >
> > with 4 instances:
> > - instance 1: 3 MirrorCheckpointConnector tasks
> > - instance 2: 2 MirrorSourceConnector tasks, 1 MirrorHeartbeatConnector
> > task
> > - instance 3: 1 MirrorSourceConnector task, 1 MirrorCheckpointConnector
> > task
> > - instance 4: 1 MirrorSourceConnector task, 1 MirrorHeartbeatConnector
> task
> >
> > So it seems that it is not well balanced, but can be scaled somewhat, not
> > ideal.
> > Is this how it should work?
> >
> > Peter
> >
> > On Fri, 20 Mar 2020 at 20:58, Ryanne Dolan 
> wrote:
> >
> > > Peter, what happens when you add an additional node? Usually Connect
> will
> > > detect it and rebalance tasks accordingly. I'm wondering if that
> > mechanism
> > > isn't working for you.
> > >
> > > Ryanne
> > >
> > > On Fri, Mar 20, 2020 at 2:40 PM Péter Sinóros-Szabó
> > >  wrote:
> > >
> > > > Well, I don't know much about herders. If you can give some idea how
> to
> > > > check it, I will try.
> > > >
> > > > Peter
> > > >
> > > > On Fri, 20 Mar 2020 at 17:47, Ryanne Dolan 
> > > wrote:
> > > >
> > > > > Hmm, that's weird. I'd expect the type of tasks to be evenly
> > > distributed
> > > > as
> > > > > well. Is it possible one of the internal topics are misconfigured
> > s.t.
> > > > the
> > > > > Herders aren't functioning correctly?
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Fri, Mar 20, 2020 at 11:17 AM Péter Sinóros-Szabó
> > > > >  wrote:
> > > > >
> > > > > > I use tasks.max = 4.
> > > > > >
> > > > > > I see 4 tasks of MirrorSourceConnectors on MM2 instances A.
> > > > > > I see 4 tasks of MirrorCheckpointConnector and 1 task of
> > > > > > MirrorHeartbeatConnector on MM2 instance B.
> > > > > >
> > > > > > The number of tasks are well distributed, but the type of tasks
> are
> > > > not.
> > > > > > According to Connect documentation I expected 1-3 or 2-2 tasks of
> > > > > > the MirrorSourceConnectors on the two MM2 instances.
> > > > > >
> > > > > > So is this a bug or an expected behaviour?
> > > > > >
> > > > > > Thanks,
> > > > > > Peter
> > > > > >
> > > > > > On Fri, 20 Mar 2020 at 15:26, Ryanne Dolan <
> ryannedo...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Peter, in Connect the Connectors are only run on the leader
> node.
> > > > Most
> > > > > of
> > > > > > > the work is done in the Tasks, which should be divided across
> > > nodes.
> > > > > Make
> > > > > > > sure you have tasks.max set to something higher than the
> default
> > of
> > > > 1.
> > > > > > >
> > > > > > > Ryanne
> > > > > > >
> > > > > > > On Fri, Mar 20, 2020, 8:53 AM Péter Sinóros-Szabó
> > > > > > >  wrote:
> > > > > > >
> > > > > > > > Hey,
> > > > > > > >
> > > > > > > > I am using MM2 to mirror A cluster to B with tasks.max = 4.
> > > > > > > >
> > > > > > > > I started two instances of MM2 and noticed that all
> > > > > > > MirrorSourceConnectors
> > > > > > > > were running in one instance and the rest of the connectors
> in
> > > the
> > > > > > other.
> > > > > > > >
> > > > > > > > This results in a very uneven resource utilization and also
> it
> > > did
> > > > > not
> > > > > > > > really spread the mirroring oad between the two nodes.
> > > > > > > >
> > > > > > > > I assumed that MM2 will run 2-2 of those connectors in each
> > > > instance.
> > > > > > > >
> > > > > > > > Is this current behaviour as expected or did I miss something
> > on
> > > > how
> > > > > to
> > > > > > > > configure it better?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Peter
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Max poll interval and timeouts

2020-03-23 Thread Kamal Chandraprakash
Hi Ryan,

The maxPollInterval waits for at-most the given time duration and returns
ASAP even if a single record is available.
If you want to collect data once 30-45 minutes,  better to use the Consumer
with `assign` mode and poll for records
once in 30 minutes.

If you're using the consumer with `subscribe` mode, then you have to update
the following configs:
1. session.timeout.ms
2. heartbeat.interval.ms and
3. group.max.session.timeout.ms in the broker configs.

Increasing the session timeout will lead to delay in detecting the consumer
failures, I would suggest to go with `assign` mode.


On Tue, Mar 24, 2020 at 4:45 AM Ryan Schachte 
wrote:

> Hey guys, I'm getting a bit overwhelmed by the different variables used to
> help enable batching for me.
>
> I have some custom batching logic that processes when either N records have
> been buffered or my max timeout has been hit. It was working decently well,
> but I hit this error:
>
> *This means that the time between subsequent calls to poll() was longer
> than the configured max.poll.interval.ms ,
> which typically implies that the poll loop is spending too much time
> message processing.*
>
> I ultimately want to wait for the buffer to fill up or sit and collect data
> continuously for 30-45 mins at a time. Do I need to do anything with
> heartbeat or session timeout as well?
>
> So now my question is.. Can I just bump my maxPollInterval to something
> like:
>
> maxPollInterval: '270',
>


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Kamal Chandraprakash
Congrats John!

On Wed, Nov 13, 2019 at 7:57 AM Dong Lin  wrote:

> Congratulations John!
>
> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang  wrote:
>
> > Hi Everyone,
> >
> > The PMC of Apache Kafka is pleased to announce a new Kafka committer,
> John
> > Roesler.
> >
> > John has been contributing to Apache Kafka since early 2018. His main
> > contributions are primarily around Kafka Streams, but have also included
> > improving our test coverage beyond Streams as well. Besides his own code
> > contributions, John has also actively participated on community
> discussions
> > and reviews including several other contributors' big proposals like
> > foreign-key join in Streams (KIP-213). He has also been writing,
> presenting
> > and evangelizing Apache Kafka in many venues.
> >
> > Congratulations, John! And look forward to more collaborations with you
> on
> > Apache Kafka.
> >
> >
> > Guozhang, on behalf of the Apache Kafka PMC
> >
>


Re: list of pattern processed topic list

2019-08-19 Thread Kamal Chandraprakash
You can use the KafkaConsumer#assignment() method to get all the assigned
topic-partitions for that consumer instance.
But, you've to periodically call poll method to get the latest assignment
which may return records.
This shortcoming is actively discussed in the below threads.

https://mail-archives.apache.org/mod_mbox/kafka-dev/201908.mbox/<08030a68-3f0b-42db-9b79-dfcd3200cf25%40www.fastmail.com>

https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484


On Mon, Aug 19, 2019 at 6:33 PM M. Manna  wrote:

> Kafka doesn’t have regex subscription. You could get a list of available
> topic using admin client. From there, you can decide what topics you would
> like to subscribe to (or process using your logic).
>
> Check documentation for AdminClient.listTopics and ListTopicResult holder.
>
> I hope this helps.
>
> Regards,
>
> On Mon, 19 Aug 2019 at 13:22, Upendra Yadav  wrote:
>
> > Hi,
> > I have initialised kafka consumer:
> > KafkaConsumer consumer = new KafkaConsumer > byte[]>(consumerConfig);
> >
> > and subscribed with topic pattern:
> > consumer.subscribe(Pattern.compile(topicRegex), listener);
> >
> > now, I'm trying to get the list of topics.
> > Map> topicsPartitions =
> consumer.listTopics();
> >
> > here topic list is not following given pattern. it is giving all topic
> > list.
> > Is this expected behaviour? Is there any API to get subscribed topic list
> > with consumer instance?
> > or Do I need to write simple prog to process this list with same pattern?
> >
>


Re: [VOTE] 2.3.0 RC3

2019-06-21 Thread Kamal Chandraprakash
+1 (non-binding)

* Ran unit and integration test on 2.11 and 2.12
* Verified quick start
* Ran internal apps on the 3 node cluster

On Thu, Jun 20, 2019 at 3:33 AM Colin McCabe  wrote:

> Hi all,
>
> We discovered some problems with the second release candidate (RC2) of
> 2.3.0.  Specifically, KAFKA-8564.  I've created a new RC which includes the
> fix for this issue.
>
> Check out the release notes for the 2.3.0 release here:
> https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/RELEASE_NOTES.html
>
> The vote will go until Saturday, June 22nd, or until we create another RC.
>
> * Kafka's KEYS file containing PGP keys we use to sign the release can be
> found here:
> https://kafka.apache.org/KEYS
>
> * The release artifacts to be voted upon (source and binary) are here:
> https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/javadoc/
>
> * The tag to be voted upon (off the 2.3 branch) is the 2.3.0 tag:
> https://github.com/apache/kafka/releases/tag/2.3.0-rc3
>
> best,
> Colin
>
> C.
>


Re: Kafka delaying message

2019-05-26 Thread Kamal Chandraprakash
If you have a dedicated topicPartition for delayed messages, you can pause

that partition for 15 min to avoid blocking the polling thread.

On Thu, May 23, 2019 at 12:16 PM Adrien Ruffie  wrote:

> Thank a lot ! That's what I thought, out of the box ... but it's clearly
> logic
> I will take your solution in the consumer, but I will refactor my Java code
> part, because, it's a spring message listener with annotation. So, I
> think, I couldn't do it simply with spring.
>
> thank again,
>
> best regards
> Adrian
>
> Le mer. 22 mai 2019 à 23:23, Jonathan Santilli  >
> a écrit :
>
> > Maybe you could:
> >
> > 1.- Disable auto.commit on the consumer side.
> > 2.- Consume messages, one by one in each poll.
> > 3.- Check the record timestamp
> > a.- If the timestamp is >= desired window (time slot), process it and
> > commit offset
> > b.- If the timestamp is < desired window (time slot), do not process
> it
> > and do not commit offset, in the next poll the consumer will get the
> > message again.
> >
> > Hope that helps,
> > --
> > Jonathan
> >
> >
> >
> >
> > On Wed, May 22, 2019 at 9:25 PM Peter Bukowinski 
> wrote:
> >
> > > I’d suggest using separate topics for messages that require delay and
> > ones
> > > that do not. If you are limited to a single topic, then I’d use some
> > other
> > > metadata to differentiate messages that require delayed processing from
> > > ones that do not. If you do not want to block the polling thread,
> you’ll
> > > need to route messages into a buffer of some sort to process them
> > > asynchronously.
> > >
> > > —
> > > Peter
> > >
> > > > On May 22, 2019, at 1:10 PM, Pavel Molchanov <
> > > pavel.molcha...@infodesk.com> wrote:
> > > >
> > > > This solution will block receiving polling thread for 15 minutes. Not
> > > good.
> > > >
> > > > What should we do if a topic has messages that should be processed
> > > > immediately and delayed messages at the same time?
> > > >
> > > > *Pavel Molchanov*
> > > >
> > > >
> > > >
> > > > On Wed, May 22, 2019 at 2:41 PM Peter Bukowinski 
> > > wrote:
> > > >
> > > >> There is no out-of-the-box way to tell a consumer to not consume an
> > > offset
> > > >> until it is x minutes old. Your best bet is encode the creation time
> > > into
> > > >> the message themselves and add some processing logic into your
> > consumer.
> > > >> Let’s assume your topic has a single partition or your partitions
> are
> > > keyed
> > > >> to guarantee message order. Your consumer could work like this in
> > > >> pseudo-code:
> > > >>
> > > >> consumer loop:
> > > >>consume message
> > > >>if (current time - message.timestamp) >= 15 minutes
> > > >>process message
> > > >>else
> > > >>sleep 15 minutes - (current time - message.timestamp)
> > > >>process message
> > > >>
> > > >> Since the messages enter the topic in the order they were published,
> > > >> pausing on the current offset should never cause a bottleneck on the
> > > later
> > > >> messages. If you fall behind, the greater than or equal to logic
> will
> > > >> prevent your consumer from pausing until it has caught up to your
> > > desired
> > > >> delay.
> > > >>
> > > >> This is a simplified scenario that may or may not map to your
> > production
> > > >> use case, though.
> > > >>
> > > >> —
> > > >> Peter
> > > >>
> > > >>
> > > >>> On May 22, 2019, at 11:12 AM, Pavel Molchanov <
> > > >> pavel.molcha...@infodesk.com> wrote:
> > > >>>
> > > >>> Andrien,
> > > >>>
> > > >>> Thank you for asking this question! I have the same problem and
> > wanted
> > > to
> > > >>> ask the same question. I hope that someone will answer soon.
> > > >>>
> > > >>> *Pavel Molchanov*
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Wed, May 22, 2019 at 9:54 AM Adrien Ruffie 
> > > >> wrote:
> > > >>>
> > >  Hello all,
> > > 
> > >  I have a specific need and I don't know if a generic solution
> exist
> > > ...
> > >  maybe you can enlighten me
> > > 
> > >  I need to delay each sended message about 15 mins.
> > >  Example
> > >  Message with offset 1 created at 2:41PM by the producer and
> received
> > > by
> > > >> the
> > >  consumer at 2:56PM
> > >  Message with offset 2 created at 2:46PM by the producer and
> received
> > > by
> > > >> the
> > >  consumer at 3:01PM
> > >  Message with offset 3 created at 2:46PM by the producer and
> received
> > > by
> > > >> the
> > >  consumer at 3:01PM
> > >  Message with offset 4 created at 3:01PM by the producer and
> received
> > > by
> > > >> the
> > >  consumer at 3:16PM
> > >  and so forth ...
> > > 
> > >  any option, mechanism, producer/consumer implementations already
> > > exist ?
> > > 
> > >  Thank a lot and best regards
> > > 
> > >  Adrian
> > > 
> > >

Re: KeeperException

2019-05-14 Thread Kamal Chandraprakash
Those are not errors. See the logs are logged in INFO mode.

https://stackoverflow.com/a/48067058/3209010

On Wed, May 15, 2019 at 10:44 AM Gagan Sabharwal  wrote:

> Hi team,
>
> Any pointers on the same ?
>
> Regards
> Gagan
>
> On Mon, May 13, 2019 at 11:24 AM Gagan Sabharwal 
> wrote:
>
> > Hi team,
> >
> > I have just started to learn kafka following
> > https://kafka.apache.org/quickstart
> >
> > When I have started my zookeeper server and kafka server, I got a few
> > exceptions in the log which are all "KepperExcetion". Even when i
> created a
> > topic with name test I got the same exception. Can anyone throw some
> light
> > on the same.
> >
> >
> > Here is an excerpt from log
> > [2019-05-13 11:01:48,010] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0x1 zxid:0x1f
> > txntype:-1 reqpath:n/a Error Path:/consumers Error:KeeperErrorCode =
> > NodeExists for /consumers
> (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,073] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0x2 zxid:0x20
> > txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode =
> > NodeExists for /brokers/ids
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,122] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0x3 zxid:0x21
> > txntype:-1 reqpath:n/a Error Path:/brokers/topics Error:KeeperErrorCode =
> > NodeExists for /brokers/topics
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,161] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0x4 zxid:0x22
> > txntype:-1 reqpath:n/a Error Path:/config/changes Error:KeeperErrorCode =
> > NodeExists for /config/changes
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,190] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0x5 zxid:0x23
> > txntype:-1 reqpath:n/a Error Path:/admin/delete_topics
> > Error:KeeperErrorCode = NodeExists for /admin/delete_topics
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,215] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0x6 zxid:0x24
> > txntype:-1 reqpath:n/a Error Path:/brokers/seqid Error:KeeperErrorCode =
> > NodeExists for /brokers/seqid
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,245] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0x7 zxid:0x25
> > txntype:-1 reqpath:n/a Error Path:/isr_change_notification
> > Error:KeeperErrorCode = NodeExists for /isr_change_notification
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,270] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0x8 zxid:0x26
> > txntype:-1 reqpath:n/a Error Path:/latest_producer_id_block
> > Error:KeeperErrorCode = NodeExists for /latest_producer_id_block
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,300] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0x9 zxid:0x27
> > txntype:-1 reqpath:n/a Error Path:/log_dir_event_notification
> > Error:KeeperErrorCode = NodeExists for /log_dir_event_notification
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,337] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0xa zxid:0x28
> > txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode =
> > NodeExists for /config/topics
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,366] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0xb zxid:0x29
> > txntype:-1 reqpath:n/a Error Path:/config/clients Error:KeeperErrorCode =
> > NodeExists for /config/clients
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,393] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0xc zxid:0x2a
> > txntype:-1 reqpath:n/a Error Path:/config/users Error:KeeperErrorCode =
> > NodeExists for /config/users
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> > [2019-05-13 11:01:48,422] INFO Got user-level KeeperException when
> > processing sessionid:0x181a14b type:create cxid:0xd zxid:0x2b
> > txntype:-1 reqpath:n/a Error Path:/config/brokers Error:KeeperErrorCode =
> > NodeExists for /config/brokers
> > (org.apache.zookeeper.server.PrepRequestProcessor)
> >
> > Regards
> > Gagan
> >
>


Re: Kafka transaction between 2 kafka clusters

2019-05-10 Thread Kamal Chandraprakash
MirrorMaker 2.0 stores the offsets of one cluster in another. So, you can
read the offsets from the same cluster once this KIP is implemented.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteClusterUtils

On Fri, May 10, 2019 at 12:29 PM Emmanuel  wrote:

> Thanks! I wonder if this is a bit far-fetched as no one seems to do this at
> the moment.
>
> On Fri, May 10, 2019 at 12:50 AM Guozhang Wang  wrote:
>
> > Hello Emmanuel,
> >
> > Yes I think it is do-able technically. Note that it means the offsets of
> > cluster A would be stored on cluster B and hence upon restarting one need
> > to talk to cluster B in order to get the committed position in cluster A.
> >
> >
> > Guozhang
> >
> >
> > On Thu, May 9, 2019 at 11:58 AM Emmanuel  wrote:
> >
> > > Hello,
> > >
> > > I would like to know if there is a Java client that would allow me to
> > > consume from topics on a cluster A and produce to topics on a cluster B
> > > with exactly-once semantics. My understanding of the Kafka transactions
> > is
> > > that on the paper it could work, but the kafka java client assumes both
> > are
> > > running on the same cluster. Is it something do-able? If it is and
> > nothing
> > > exists yet, would it be an interesting contribution?
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: Kafka upgrade process details

2019-05-10 Thread Kamal Chandraprakash
Hi,

In Kafka v2.1.0, the OffsetCommit Request/Response schema version is
changed to v4 for the *__consumer_offsets* topic.
If you upgrade Kafka to v2.1.0 & higher and changed the
inter.broker.protocol version to 2.1, then you cannot revert back
to older versions as it doesn't know how to read the *__consumer_offsets *topic
with the latest offset message schema.

Indeed, if you want to revert back to the older versions, you have to
delete the* __consumer_offsets* topic. Note: Once that
topic is deleted, all your consumer groups will start to read from the
latest/earliest available message as per your
`auto.offset.reset` configuration. Please correct me if I am wrong.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets

Note: The `__consumer_offsets` topic is read by the broker after each
restart to load the last commited offset of the consumer groups.
So, once you change the `inter.broker.protocol.version` to 2.1 the
OffsetCommit schema version changes to v4.

On Thu, May 9, 2019 at 4:21 PM Chylek, Artur  wrote:

> Thanks for prompt response.
> I am not sure I understand correctly, but I am still confused why
> switching inter.broker.protocol.version in the last step would make the
> process irreversible.
> If we agree that log conversion to a new format is applied when a new
> value of log.message.format.version or broker's default one is applied then
> according to documentation switching back to old version of broker will
> make no harm. So either a broker is able to read a newer versions of log
> and dismiss parts he doesn’t understand or the conversion itself occurs
> only at certain combination of inter.broker.protocol.version and
> log.message.format.version. Or maybe I am completely wrong with my
> assumptions.
>
> Regards,
> Artur
>
> -Original Message-
> From: M. Manna [mailto:manme...@gmail.com]
> Sent: Thursday, May 09, 2019 12:19 PM
> To: Kafka Users 
> Subject: Re: Kafka upgrade process details
>
> Sabre Email Notification: This email is from an EXTERNAL source. Please
> use caution when clicking on links or opening attachments from an unknown
> or suspicious sender.To report a suspected phishing email, browse to:
> help.sabre.com > Risk & Security > Report a Security Issue
>
> __
> Artur,
>
>
>
> The upgrade process is such that
>
>
>
> 1) You ensure that there is a hard-check on protocol version if not exists
>
> already. As you have already mentioned above, in #3 - it's to ensure that
>
> min verson for msg formats are being adhered to before upgrade.
>
> 2) broker protocol version is to ensure that when you do rolling upgrade
>
> there is a minimum compatibility between inter-broker comms (if that makes
>
> sense).
>
>
>
> API versions are there for a specific reason (to ensure compatbility is
>
> maintained first before the protocol/msg formats are fixed). Also, it
>
> ensures that a regular upgrade (e.g. from bugfix->major release) happens
>
> seamlessly via rolling restart. If you want to get techincal about it, you
>
> can look at kafka Table load and config load in github codebase.
>
>
>
> Once you've initiated a rolling restart by setting
>
> inter.broker.protocl.version and log.messge.format.version there is no way
>
> back - but i am happy to be proven wrong as I have only done rolling
>
> restart successfully (never needed to go back :) )
>
>
>
>
>
>
>
> On Thu, 9 May 2019 at 09:54, Chylek, Artur  wrote:
>
>
>
> > Hi,
>
> > I read the documentation about upgrading Kafka(
>
> >
> https://urldefense.proofpoint.com/v2/url?u=http-3A__kafka.apache.org_21_documentation.html-23upgrade-5F2-5F1-5F0&d=DwIBaQ&c=FXJfUb1oWgygD0uNz-ujnA&r=bVI5M04li2b_AW9E6XWAZb5H4NuzOzdzPeKTA_sjdMg&m=3g5HRSTfPHijn_jhhzYIB88jScofj68jB1AmrcMPBC0&s=qiYRoeea_IxCjRGx1Vi7ylwk2vuCiYGGoPhhGYmiwos&e=)
> but I have
>
> > questions that I believe the documentation doesn't cover. I am planning
> to
>
> > upgrade Kafka from 2.0.0 to 2.1.0 and would like to make sure what to do
>
> > when something goes wrong - I am mostly interested in reverting back to
> the
>
> > older version. I admit that I am confused about API versions, log message
>
> > versions and inter broker protocol versions.
>
> >
>
> > According to the documentation these are the steps I need to follow:
>
> >
>
> >   1.  Set inter.broker.protocol.version to 2.0.0
>
> >   2.  I don't have currently log.message.format.version set to 2.0.0, so
>
> > according to documentation I don't need to specify it
>
> >   3.  Upgrade broker code and restart it. This is I am not sure what
>
> > happens next. Since I don't have log.message.format.version set then
>
> > broker's default value will be used - I guess 2.1.1. Does it mean that:
>
> >  *   New messages that arrive from producers will be saved in a log
>
> > file with a new format?
>
> >  *   Old messages that already exist in log files will be converted
> to
>
> > the n

Re: Best Practice Scaling Consumers

2019-05-06 Thread Kamal Chandraprakash
1. Yes, you may have to overprovision the number of partitions to handle
the load peaks. Refer this

document to choose the no. of partitions.
2. KIP-429

is
proposed to reduce the time taken by the consumer rebalance protocol when a
consumer instance is added/removed from the group.

On Mon, May 6, 2019 at 7:47 PM Moritz Petersen 
wrote:

> Hi all,
>
> I’m new to Kafka and have a very basic question:
>
> We build a cloud-scale platform and evaluate if we can use Kafka for
> pub-sub messaging between our services. Most of our services scale
> dynamically based on load (number of requests, CPU load etc.). In our
> current architecture, services are both, producers and consumers since all
> services listen to some kind of events.
>
> With Kafka, I assume we have two restrictions or issues:
>
>   1.  Number of consumers is restricted to the number of partitions of a
> topic. Changing the number of partitions is a relatively expensive
> operation (at least compared to scaling services). Is it necessary to
> overprovision on the number of partitions in order to be prepared for load
> peaks?
>   2.  Adding or removing consumers halts processing of the related
> partition for a short period of time. Is it possible to avoid or
> significantly minimize this lag?
>
> Are there any additional best practices to implement Kafka consumers on a
> cloud scale environment?
>
> Thanks,
> Moritz
>
>


Re: Restart process after adding control.plane.listener.name config in Kafka 2.2.0

2019-05-06 Thread Kamal Chandraprakash
Migration plan for this change is mentioned in the KIP-291
.
Take a look at the migration plan.

Migration plan: 2 rounds of rolling upgrades are needed to pick up the
> proposed changes in this KIP. The goal of the first round is to add the
> controller endpoint, without adding the "control.plane.listener.name"
> config. Specifically, an endpoint with the controller listener name should
> be added to the "listeners" config, e.g. "CONTROLLER://192.1.1.8:9091";
> if the "advertised.listeners" config is explicitly configured and is not
> getting its value from "listeners", the new endpoint for controller should
> also be added to "advertised.listeners". After the first round is
> completed, controller to brokers communication should still behave in the
> same way that uses the inter-broker-listener-name, since the "
> control.plane.listener.name" is not set yet. In the 2nd round, the "
> control.plane.listener.name" config should be set to the corresponding
> listener name, e.g. "CONTROLLER". During rolling upgrade of the 2nd round,
> controller to brokers traffic will start using the "
> control.plane.listener.name", and go through the proposed changes in this
> KIP.


On Fri, May 3, 2019 at 5:28 PM Jonathan Santilli 
wrote:

> Hello, hope you all are great,
>
> I would like to know what's the process to update the Kafka Broker
> configuration in order to use the new config: *control.plane.listener.name
>  in Kafka version 2.2.0*
>
> The documentation says: *Name of listener used for communication between
> controller and brokers.*
>
> If I change it just in one Broker and restart it, will this cause a
> communication issue since the others Brokers still are not using the new
> config/port?
>
>
> Cheers!
> --
> Santilli Jonathan
>


Re: measuring incoming bytes

2019-02-02 Thread Kamal Chandraprakash
In Yammer metrics, the 15MinuteRate provided by the Meter is not a pure
average for the last 15 minutes. It's a Exponential Weighted Moving
Average.
The value is calculated by taking 36% of weighted average from the broker
start time and 64 % of weighted average from the last 15 minutes.

I initially thought this metric is the total volume of bytes that came in
for that topic in last 15 mins. So, if I collect this metric every 15 mins,
then I will have total volume over time by calculating the sum of data
points in the time interval.

You can use `count` instead of `15MinuteRate` to find out the total volume
of bytes that came in a topic for last 15 mins by taking diff of the two
points.



On Sat, Feb 2, 2019 at 8:39 PM Amitav Mohanty 
wrote:

> Hi
>
> I am trying to measure incoming bytes over time. I am trying collect the
> following metric and apply integral function over a set of data points on a
> time series.
>
> kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=my_topic
> FifteenMinuteRate
>
> It seems that the number I am getting is much less than expected. So, I am
> suspecting that my understanding of this metric is wrong.
>
> I initially thought this metric is the total volume of bytes that came in
> for that topic in last 15 mins. So, if I collect this metric every 15 mins,
> then I will have total volume over time by calculating the sum of data
> points in the time interval.
>
> Please confirm if my understanding is correct.
>
> Regards,
> Amitav
>


Re: Very long consumer rebalances

2018-08-09 Thread Kamal Chandraprakash
In v0.10.0.1, consumer heartbeat background thread feature is not
available.
Lot of users faced similar errors. So, KIP-62

is
proposed. You have to update your
Kafka version.

I highly recommend you to upgrade Kafka to the version where the heartbeat
background
thread feature is implemented (v0.10.1.0). If you don't have any option to
upgrade, you
have to heartbeat the co-ordinator manually from your consumer. You can use
this

code
snippet for reference.




On Thu, Aug 9, 2018 at 3:03 PM M. Manna  wrote:

> In the simplest way, how have you implemented your consumer?
>
> 1) Does your consumers join a designated group, process messages, and then
> closes all connection? Or does it stay open perpetually until server
> shutdown?
> 2) Have you configured the session timeouts for client and zookeeper
> accordingly?
>
> Regards,
>
> On 9 August 2018 at 08:00, Shantanu Deshmukh 
> wrote:
>
> >  I am facing too many problems these days. Now one of our consumer groups
> > is rebalancing every now and then. And rebalance takes very low, more
> than
> > 5-10 minutes. Even after re-balancing I see that only half of the
> consumers
> > are active/receive assignment. Its all going haywire.
> >
> > I am seeing these logs in kafka consumer logs. Can anyone help me
> > understand what is going on here? It is a very long piece of log, but
> > someone please help me. I am desperately looking for any solution since
> > more than 2 months now. But to no avail.
> >
> > [2018-08-09 11:39:51] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:53] :: DEBUG ::
> > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > bulk-email-consumer committed offset 25465113 for partition bulk-email-8
> > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
> > autocommit of offsets {bulk-email-8=OffsetAndMetadata{offset=25465113,
> > metadata=''}} for group bulk-email-consumer
> > [2018-08-09 11:39:53] :: DEBUG ::
> > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > bulk-email-consumer committed offset 25463566 for partition bulk-email-6
> > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
> > autocommit of offsets {bulk-email-6=OffsetAndMetadata{offset=25463566,
> > metadata=''}} for group bulk-email-consumer
> > [2018-08-09 11:39:53] :: DEBUG ::
> > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > bulk-email-consumer committed offset 2588 for partition bulk-email-9
> > [2018-08-09 11:39:53] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
> > autocommit of offsets {bulk-email-9=OffsetAndMetadata{offset=2588,
> > metadata=''}} for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:54] :: DEBUG ::
> > AbstractCoordinator$HeartbeatResponseHandler:694 - Received successful
> > heartbeat response for group bulk-email-consumer
> > [2018-08-09 11:39:56] :: DEBUG ::
> > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > bulk-email-consumer committed offset 25463566 for partition bulk-email-6
> > [2018-08-09 11:39:56] :: DEBUG ::
> > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > bulk-email-consumer committed offset 2588 for partition bulk-email-9
> > [2018-08-09 11:39:56] :: DEBUG ::
> > ConsumerCoordinator$OffsetCommitResponseHandler:640 - Group
> > bulk-email-consumer committed offset 25465113 for partition bulk-email-8
> > [2018-08-09 11:39:56] :: DEBUG :: ConsumerCoordinator$4:539 - Completed
> > autocommit of offsets {b

Re: How set properly infinite retention

2018-07-30 Thread Kamal Chandraprakash
log.retention.ms = 9223372036854775807 (Long.MAX_VALUE)


On Mon, Jul 30, 2018 at 3:04 PM David Espinosa  wrote:

> Hi thanks a lot for the reply.
>
> The thing is that I need compaction to delete some messages (for GDPR
> purposes), and for that I need the log cleaner to be enabled (with
> policy=compact).
>
> David
>
>
> El lun., 30 jul. 2018 a las 11:27, M. Manna ()
> escribió:
>
> > I believe you can simply disable log cleaner.
> >
> > On Mon, 30 Jul 2018, 10:07 David Espinosa,  wrote:
> >
> > > Hi all,
> > > I would like to set infinite retention for all topics created in the
> > > cluster by default.
> > > I have tried with:
> > >
> > > *log.retention.ms =-1* at *server.properties*
> > >
> > > But messages get deleted approx after 10 days.
> > >
> > > Which configuration at broker level should I use for infinite
> retention?
> > >
> > > Thanks in advance,
> > > David
> > >
> >
>


Re: Facing Duplication in consumer

2018-05-28 Thread Kamal Chandraprakash
If the consumer is idle (not doing any commit) for more than a day, the
offsets will be wiped out.

Rf. offsets.retention.minutes property in Kafka documentation.

On Tue, May 29, 2018 at 9:49 AM, Shantanu Deshmukh 
wrote:

> Which Kafka version?
>
> On Mon, May 28, 2018 at 9:09 PM Dinesh Subramanian <
> dsubraman...@apptivo.co.in> wrote:
>
> > Hi,
> >
> > Whenever we bounce the consumer in tomcat node,  I am facing duplication.
> > It is consumed from the beginning. I have this property in consumer
> > "auto.offset.reset" =  "earliest". if it is new consumer means it will
> > consume from the beginning, but it is consumed from the beginning for the
> > consumer that we used for last 4 months and consumer offset is committed
> > already in partition wise. Any help will be appreciated.
> >
> >
> > *Thanks & Regards,*
> >
> > *Dinesh S*
> >
>


Re: 答复: [ANNOUNCE] New Committer: Dong Lin

2018-03-28 Thread Kamal Chandraprakash
Congratulations, Dong!

On Thu, Mar 29, 2018 at 7:14 AM, Manikumar 
wrote:

> Congrats, Dong!
>
> On Thu, Mar 29, 2018 at 6:45 AM, Tao Feng  wrote:
>
> > Congrats Dong!
> >
> > On Wed, Mar 28, 2018 at 5:15 PM Dong Lin  wrote:
> >
> > > Thanks everyone!!
> > >
> > > It is my great pleasure to be part of the Apache Kafka community and
> help
> > > make Apache Kafka more useful to its users. I am super excited to be a
> > > Kafka committer and I am hoping to contribute more to its design,
> > > implementation and review etc in the future.
> > >
> > > Thanks!
> > > Dong
> > >
> > > On Wed, Mar 28, 2018 at 4:04 PM, Hu Xi  wrote:
> > >
> > > > Congrats, Dong Lin!
> > > >
> > > >
> > > > 
> > > > 发件人: Matthias J. Sax 
> > > > 发送时间: 2018年3月29日 6:37
> > > > 收件人: users@kafka.apache.org; d...@kafka.apache.org
> > > > 主题: Re: [ANNOUNCE] New Committer: Dong Lin
> > > >
> > > > Congrats!
> > > >
> > > > On 3/28/18 1:16 PM, James Cheng wrote:
> > > > > Congrats, Dong!
> > > > >
> > > > > -James
> > > > >
> > > > >> On Mar 28, 2018, at 10:58 AM, Becket Qin 
> > > wrote:
> > > > >>
> > > > >> Hello everyone,
> > > > >>
> > > > >> The PMC of Apache Kafka is pleased to announce that Dong Lin has
> > > > accepted
> > > > >> our invitation to be a new Kafka committer.
> > > > >>
> > > > >> Dong started working on Kafka about four years ago, since which he
> > has
> > > > >> contributed numerous features and patches. His work on Kafka core
> > has
> > > > been
> > > > >> consistent and important. Among his contributions, most
> noticeably,
> > > Dong
> > > > >> developed JBOD (KIP-112, KIP-113) to handle disk failures and to
> > > reduce
> > > > >> overall cost, added deleteDataBefore() API (KIP-107) to allow
> users
> > > > >> actively remove old messages. Dong has also been active in the
> > > > community,
> > > > >> participating in KIP discussions and doing code reviews.
> > > > >>
> > > > >> Congratulations and looking forward to your future contribution,
> > Dong!
> > > > >>
> > > > >> Jiangjie (Becket) Qin, on behalf of Apache Kafka PMC
> > > > >
> > > >
> > > >
> > >
> >
>


Re: kafka user

2017-11-01 Thread Kamal Chandraprakash
Please follow the instructions listed here https://kafka.apache.org/contact

You have to send a mail to 'users-subscr...@kafka.apache.org' to subscribe
to user list.

On Tue, Oct 31, 2017 at 1:06 PM, Karthigeyan 
wrote:

> Hi ,
>
> pls add to the user group.
>
> Thanks ,
>
> Karthigeyan
>
>


KTable-KTable Join Semantics on NULL Key

2017-09-08 Thread Kamal Chandraprakash
Hi Kafka Users,

KTable-KTable Join Semantics is explained in detailed [here][1]. But,
it's not clear when the input record is , some times the output
records are generated  and in some cases it's not.

It will be helpful, if someone explain on how the output records are
generated for all the 3 types of joins on receiving a record with NULL
value.

[1]: https://cwiki.apache.org/confluence/display/KAFKA/
Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KTable-KTableJoin

-- Kamal


Re: Reduce Kafka Client logging

2017-09-08 Thread Kamal Chandraprakash
add this lines at the end of your log4j.properties,

log4j.logger.org.apache.kafka.clients.producer=WARN

On Thu, Sep 7, 2017 at 5:27 PM, Raghav  wrote:

> Hi Viktor
>
> Can you pleas share the log4j config snippet that I should use. My Java
> code's current log4j looks like this. How should I add this new entry that
> you mentioned ? Thanks.
>
>
> log4j.rootLogger=INFO, STDOUT
>
> log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
> log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
> log4j.appender.STDOUT.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
> %c{2}:%L %m%n
>
> log4j.appender.file=org.apache.log4j.RollingFileAppender
> log4j.appender.file.File=logfile.log
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{dd-MM- HH:mm:ss} %-5p
> %c{1}:%L - %m%n
>
> On Thu, Sep 7, 2017 at 2:34 AM, Viktor Somogyi 
> wrote:
>
> > Hi Raghav,
> >
> > I think it is enough to raise the logging level
> > of org.apache.kafka.clients.producer.ProducerConfig to WARN in log4j.
> > Also I'd like to mention that if possible, don't recreate the Kafka
> > producer each time. The protocol is designed for long-living connections
> > and recreating the connection each time puts pressure on the TCP layer
> (the
> > connection is expensive) and also on Kafka as well which may result in
> > broker failures (typically exceeding the maximum allowed number of file
> > descriptors).
> >
> > HTH,
> > Viktor
> >
> > On Thu, Sep 7, 2017 at 7:35 AM, Raghav  wrote:
> >
> > > Due to the nature of code, I have to open a connection to a different
> > Kafka
> > > broker each time, and send one message. We have several Kafka brokers.
> So
> > > my client log is full with the following logs. What log settings
> should I
> > > use in log4j just for Kafka producer logs ?
> > >
> > >
> > > 17/09/07 04:44:04 INFO producer.ProducerConfig:180 ProducerConfig
> values:
> > > acks = all
> > > batch.size = 16384
> > > block.on.buffer.full = false
> > > bootstrap.servers = [10.10.10.5:]
> > > buffer.memory = 33554432
> > > client.id =
> > > compression.type = none
> > > connections.max.idle.ms = 54
> > > interceptor.classes = null
> > > key.serializer = class
> > > org.apache.kafka.common.serialization.StringSerializer
> > > linger.ms = 1
> > > max.block.ms = 5000
> > > max.in.flight.requests.per.connection = 5
> > > max.request.size = 1048576
> > > metadata.fetch.timeout.ms = 6
> > > metadata.max.age.ms = 30
> > > metric.reporters = []
> > > metrics.num.samples = 2
> > > metrics.sample.window.ms = 3
> > > partitioner.class = class
> > > org.apache.kafka.clients.producer.internals.DefaultPartitioner
> > > receive.buffer.bytes = 32768
> > > reconnect.backoff.ms = 50
> > > request.timeout.ms = 5000
> > > retries = 0
> > > retry.backoff.ms = 100
> > > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > > sasl.kerberos.min.time.before.relogin = 6
> > > sasl.kerberos.service.name = null
> > > sasl.kerberos.ticket.renew.jitter = 0.05
> > > sasl.kerberos.ticket.renew.window.factor = 0.8
> > > sasl.mechanism = GSSAPI
> > > security.protocol = PLAINTEXT
> > > send.buffer.bytes = 131072
> > > ssl.cipher.suites = null
> > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > > ssl.endpoint.identification.algorithm = null
> > > ssl.key.password = null
> > > ssl.keymanager.algorithm = SunX509
> > > ssl.keystore.location = null
> > > ssl.keystore.password = null
> > > ssl.keystore.type = JKS
> > > ssl.protocol = TLS
> > > ssl.provider = null
> > > ssl.secure.random.implementation = null
> > > ssl.trustmanager.algorithm = PKIX
> > > ssl.truststore.location = null
> > > ssl.truststore.password = null
> > > ssl.truststore.type = JKS
> > > timeout.ms = 3
> > > value.serializer = class
> > > org.apache.kafka.common.serialization.StringSerializer
> > >
> > > On Wed, Sep 6, 2017 at 9:37 PM, Jaikiran Pai  >
> > > wrote:
> > >
> > > > Can you post the exact log messages that you are seeing?
> > > >
> > > > -Jaikiran
> > > >
> > > >
> > > >
> > > > On 07/09/17 7:55 AM, Raghav wrote:
> > > >
> > > >> Hi
> > > >>
> > > >> My Java code produces Kafka config overtime it does a send which
> makes
> > > log
> > > >> very very verbose.
> > > >>
> > > >> How can I reduce the Kafka client (producer) logging in my java
> code ?
> > > >>
> > > >> Thanks for your help.
> > > >>
> > > >>
> > > >
> > >
> > >
> > > --
> > > Raghav
> > >
> >
>
>
>
> --
> Raghav
>


Re: Why does kafka-consumer-groups show the topics written to too in its describe

2017-08-21 Thread Kamal Chandraprakash
`through` = `to` + `stream` operation. So, the consumer-groups command
showing the "fname-stream" topic.

Use `to`, if you just want to write the output to the topic.

-- Kamal

On Mon, Aug 21, 2017 at 12:05 PM, Sachin Mittal  wrote:

> Folks any thoughts on this.
> Basically I want to know on what topics does consumer group command reports
> on.
>
> I always thought it would only be the topics streams application consumes
> from and not write to.
>
> Any inputs or any part of code I can look at to understand this better
> would be helpful.
>
> Thanks
> Sachin
>
>
> On Sun, Aug 6, 2017 at 5:57 PM, Sachin Mittal  wrote:
>
> > Hi,
> > I am executing following command
> > bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server
> > localhost:9092 --describe --group new-part-advice
> >
> > It gives output like
> >
> > GROUP  TOPIC  PARTITION
> >  CURRENT-OFFSET  LOG-END-OFFSET  LAG OWNER
> > new-part-advice  advice-stream 8
> >  114548551  114548853  302
> > 84-StreamThread-3-consumer_/10.9.56.84
> > new-part-advice  fname-stream  1  584
> >   610  26
> >  84-StreamThread-4-consumer_/10.9.56.84
> > .
> >
> > My pipeline is:
> > KStream input = builder.stream(Serdes.String(),
> > beaconSerde, "advice-stream");
> >
> > input.
> >  
> >  foreach();
> >
> >
> > input.
> >  
> >  .through(Serdes.String(), valueSerde, "fname-stream");
> >
> > So I don't understand why it is showing topic partitions from
> fname-stream
> > in describe, as the process is just writing to that topic and not
> consuming
> > from it.
> > Also what does lag in the case mean?
> >
> > Thanks
> > Sachin
> >
> >
>