Manual offset control and topic compaction

2017-11-03 Thread Stig Rohde Døssing
Hi,

I'm working on the Kafka connector for Apache Storm, which pulls messages
from Kafka and emits them into a Storm topology. The connector uses manual
offset control since message processing happens asynchronously to pulling
messages from Kafka, and we hit an issue a while back related to topic
compaction. I think we can solve it, but I'd like confirmation that the way
we're going about it isn't wrong.

The connector keeps track of which offsets have been emitted into the
topology, along with other information such as how many times they've been
retried. When an offset should be retried the connector fetches the message
from Kafka again (it is not kept in-memory once emitted). We only clean up
the state for an offset once it is fully processed.

The issue we hit is that if topic compaction is enabled, we need to know
that the offset is no longer available so we can delete the corresponding
state. Would the approach described here https://issues.apache.org/
jira/browse/STORM-2546?focusedCommentId=16151172&page=com.atlassian.jira.
plugin.system.issuetabpanels:comment-tabpanel#comment-16151172 be
reasonable for this, or is there another way to check if an offset has been
deleted?

Thanks.


Re: Manual offset control and topic compaction

2017-11-09 Thread Stig Rohde Døssing
I should probably have been a little more clear what I'm asking, so here's
an example.

Let's say that I have a consumer on a topic partition, and I'm doing manual
commits. Because I'm doing manual commits and the messages are processed
asynchronously from the consumer poll loop, I keep track of which offsets I
have received from Kafka, and mark them as done once they have been
processed. I don't commit offset X until all messages X-1, X-2... have been
marked as done.

Say that I've previously received offsets 0-10 and realize that I need to
retry offset 2. I seek the consumer to offset 2 on that partition. I poll
once and check the resulting records, and find that the earliest received
message has offset 5. Is it now correct to assume that offset 2-4 must have
been compacted away, so I should forget about those messages, stop trying
to fetch them and mark them as done in my offset tracker?

2017-11-03 19:50 GMT+01:00 Stig Rohde Døssing :

> Hi,
>
> I'm working on the Kafka connector for Apache Storm, which pulls messages
> from Kafka and emits them into a Storm topology. The connector uses manual
> offset control since message processing happens asynchronously to pulling
> messages from Kafka, and we hit an issue a while back related to topic
> compaction. I think we can solve it, but I'd like confirmation that the way
> we're going about it isn't wrong.
>
> The connector keeps track of which offsets have been emitted into the
> topology, along with other information such as how many times they've been
> retried. When an offset should be retried the connector fetches the message
> from Kafka again (it is not kept in-memory once emitted). We only clean up
> the state for an offset once it is fully processed.
>
> The issue we hit is that if topic compaction is enabled, we need to know
> that the offset is no longer available so we can delete the corresponding
> state. Would the approach described here https://issues.apache.org/jira
> /browse/STORM-2546?focusedCommentId=16151172&page=com.
> atlassian.jira.plugin.system.issuetabpanels:comment-
> tabpanel#comment-16151172 be reasonable for this, or is there another way
> to check if an offset has been deleted?
>
> Thanks.
>


Follower node receiving records out of order

2020-07-30 Thread Stig Rohde Døssing
Hi,

We are expanding a 3-node cluster to a 5-node cluster, and have encountered
an issue where a follower node is fetching offsets out of order. We are on
2.4.0.

We've used the kafka-reassign-partitions tool. Several partitions are
affected. Picking an example partition (11), it was configured to go from
replicas [1,2,3] to [3,4,5], without enabling throttling. Below is the
current state of that partition:

Topic: some-topic Partition: 11 Leader: 3 Replicas: 3,4,5,1,2 Isr: 2,3,1

What we are seeing is that follower 4 is getting an exception when fetching
offsets from 3.

kafka.common.OffsetsOutOfOrderException: Out of order offsets found in
append to some-topic: ArrayBuffer(, 1091513, 745397, 1110822,
1127988, )
at kafka.log.Log.$anonfun$append$2(Log.scala:1096)
at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
at kafka.log.Log.append(Log.scala:1032)
at kafka.log.Log.appendAsFollower(Log.scala:1012)
at
kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1(Partition.scala:910)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259)
at
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:903)
at
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:917)
at
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:161)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:317)
at scala.Option.foreach(Option.scala:437)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:306)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:305)
at scala.collection.immutable.List.foreach(List.scala:305)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:305)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:305)
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:133)
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:132)
at scala.Option.foreach(Option.scala:437)
at
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:132)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

[2020-07-30 11:25:37,676] WARN [ReplicaFetcher replicaId=4, leaderId=3,
fetcherId=0] Partition some-topic-11 marked as failed
(kafka.server.ReplicaFetcherThread)

As far as we can tell, it is a single offset in the sequence that is not
properly ordered. We are not sure what to look for to debug this? If anyone
has seen something similar, advice would be welcome. The reassignment is
stuck as this error recurs.

Broker configuration that may or may not be relevant follows:

listeners=PLAINTEXT://localhost:9092
replica.fetch.max.bytes=104857600
log.segment.bytes=134217728
message.max.bytes=104857600
compression.type=producer
log.dirs=/data/kafka-data
log.retention.check.interval.ms=30
unclean.leader.election.enable=false


Getting the suggested partition assignment via the Admin client

2020-10-05 Thread Stig Rohde Døssing
Hi,

Kafka can recommend an assignment via the ReassignPartitionsCommand (with
--generate), and it is also possible to get this assignment directly by
calling AdminUtils.assignReplicasToBrokers.

We would like our application to be able to get the suggested partition
assignment and apply it to a topic. We're currently doing this by calling
AdminUtils directly and passing the result to
Admin.alterPartitionReassignments.

Is there a way to do this solely using the kafka-clients Admin client? We'd
like to avoid depending directly on the Kafka server jar.


Re: Getting the suggested partition assignment via the Admin client

2020-10-05 Thread Stig Rohde Døssing
Thank you.

You are right about the limitations of Kafka's own assignments. Thank you
for letting me know about Cruise Control.

I think given that Kafka's own assignment is useful enough to be exposed
via the local reassign command, it should also be exposed on the Admin
client. It is likely sufficient for our use case.

Will look into creating a KIP for this.

Den man. 5. okt. 2020 kl. 11.34 skrev Tom Bentley :

> Hi Stig,
>
> AdminUtils is not a public API, so you're right to not want to depend on
> it. Unfortunately the Admin client API doesn't offer functionality
> equivalent to `kafka-reassign-partitions.sh --generate`. That is, while you
> can get the broker to generate the assignments for you when you create a
> topic or add partitions, you can't say "reassign these existing topics as
> if they were new".
>
> One of the problems with the naive assignments generated by
> AdminUtils.assignReplicasToBrokers is that the assignments it returns can
> result in an unbalanced cluster, where some brokers have much higher load
> than others. Many people running Kafka use external tools, such as Cruise
> Control, to manage replica assignments so that the cluster is balanced.
>
> You could try to open a KIP to provide the functionality. I can see it
> might be useful for clusters where it doesn't matter if the brokers have
> similar load. The risk of adding this functionality is that people end up
> using it inappropriately and ending up with very unbalanced clusters in
> production.
>
> Kind regards,
>
> Tom
>
> On Mon, Oct 5, 2020 at 10:05 AM Stig Rohde Døssing  >
> wrote:
>
> > Hi,
> >
> > Kafka can recommend an assignment via the ReassignPartitionsCommand (with
> > --generate), and it is also possible to get this assignment directly by
> > calling AdminUtils.assignReplicasToBrokers.
> >
> > We would like our application to be able to get the suggested partition
> > assignment and apply it to a topic. We're currently doing this by calling
> > AdminUtils directly and passing the result to
> > Admin.alterPartitionReassignments.
> >
> > Is there a way to do this solely using the kafka-clients Admin client?
> We'd
> > like to avoid depending directly on the Kafka server jar.
> >
>


Semantics of acks=all

2020-12-11 Thread Stig Rohde Døssing
Hi,

We have a topic with min.insync.replicas = 2 where each partition is
replicated to 3 nodes.

When we send a produce request with acks=all, the request should fail if
the records don't make it to at least 2 nodes.

If the produce request fails, what does the partition leader do with the
records it has written to the local log. Are they deleted, or will the
producer's retry cause duplication?


Re: Semantics of acks=all

2020-12-11 Thread Stig Rohde Døssing
Thanks.

Den fre. 11. dec. 2020 kl. 13.52 skrev Fabio Pardi :

>
>
> On 11/12/2020 13:20, Stig Rohde Døssing wrote:
> > Hi,
> >
> > We have a topic with min.insync.replicas = 2 where each partition is
> > replicated to 3 nodes.
> >
> > When we send a produce request with acks=all, the request should fail if
> > the records don't make it to at least 2 nodes.
> >
> > If the produce request fails, what does the partition leader do with the
> > records it has written to the local log. Are they deleted, or will the
> > producer's retry cause duplication?
> >
> Hi,
>
> the record is not committed to the filesystem and an error is returned to
> the producer.
>
> regards,
>
> fabio pardi
>
>
>


Handling "uneven" network partitions

2020-12-11 Thread Stig Rohde Døssing
 Hi,

We have a topic with min.insync.replicas = 2 where each partition is
replicated to 3 nodes. We write to it using acks=all.

We experienced a network malfunction, where leader node 1 could not reach
replica 2 and 3, and vice versa. Nodes 2 and 3 could reach each other. The
controller broker could reach all nodes, and external services could reach
all nodes.

What we saw was the ISR degrade to only node 1. Looking at the code, I see
the ISR shrink when a replica has not caught up to the leader's LEO and it
hasn't fetched for a while. My guess is the leader had messages that
weren't yet replicated by the other nodes.

Shouldn't min.insync.replicas = 2 and acks=all prevent the ISR shrinking to
this size, since new writes should not be accepted unless they are
replicated by at least 2 nodes?


Re: `java.lang.NoSuchFieldError: DEFAULT_SASL_ENABLED_MECHANISMS` after upgrading `Kafka-clients` from 2.5.0 to 3.0.0

2021-09-24 Thread Stig Rohde Døssing
I've had something similar on a different embedded kafka project. Most
likely your issue is that you are putting kafka-clients 3.0.0 on the
classpath alongside the Kafka server in version 2.7.1, which is the version
brought in by your spring-kafka-test dependency. Since the Kafka server
itself depends on kafka-clients, if you upgrade kafka-clients but not the
server on the same classpath, you might get code mismatches like this.

I think you need to wait for a new version of spring-kafka-test. You can
try bumping the org.apache.kafka:kafka_2.13 dependency to 3.0.0, but
there's no guarantee it will work.

Den fre. 24. sep. 2021 kl. 09.24 skrev Bruno Cadonna :

> Hi Bruce,
>
> I do not know the specific root cause of your errors but what I found is
> that Spring 2.7.x is compatible with clients 2.7.0 and 2.8.0, not with
> 3.0.0 and 2.8.1:
>
> https://spring.io/projects/spring-kafka
>
> Best.
> Bruno
>
> On 24.09.21 00:25, Chang Liu wrote:
> > Hi Kafka users,
> >
> > I start running into the following error after upgrading `Kafka-clients`
> from 2.5.0 to 3.0.0. And I see the same error with 2.8.1. I don’t see a
> working solution by searching on Google:
> https://stackoverflow.com/questions/46914225/kafka-cannot-create-embedded-kafka-server
> <
> https://stackoverflow.com/questions/46914225/kafka-cannot-create-embedded-kafka-server
> >
> >
> > This looks like backward incompatibility of Kafka-clients. Do you happen
> to know a solution for this?
> >
> > ```
> > java.lang.NoSuchFieldError: DEFAULT_SASL_ENABLED_MECHANISMS
> >
> >   at kafka.server.Defaults$.(KafkaConfig.scala:242)
> >   at kafka.server.Defaults$.(KafkaConfig.scala)
> >   at kafka.server.KafkaConfig$.(KafkaConfig.scala:961)
> >   at kafka.server.KafkaConfig$.(KafkaConfig.scala)
> >   at kafka.server.KafkaConfig.LogDirProp(KafkaConfig.scala)
> >   at
> org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:298)
> >   at
> org.springframework.kafka.test.rule.EmbeddedKafkaRule.before(EmbeddedKafkaRule.java:113)
> >   at
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)
> >   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> >   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> >   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> >   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> >   at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> >   at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> >   at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
> >   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> > ```
> >
> > I got some suggestion that is to upgrade Spring library.
> >
> > This is the `pom.xml` that defines all my dependencies. I only upgraded
> the `Kafka-clients` in production:
> https://github.com/opensearch-project/security/blob/main/pom.xml#L84 <
> https://github.com/opensearch-project/security/blob/main/pom.xml#L84>
> >
> > The dependency for test still remains:
> https://github.com/opensearch-project/security/blob/main/pom.xml#L503 <
> https://github.com/opensearch-project/security/blob/main/pom.xml#L503>
> >
> > Is this the Spring library that should be upgraded?
> https://github.com/opensearch-project/security/blob/main/pom.xml#L495 <
> https://github.com/opensearch-project/security/blob/main/pom.xml#L495>
> >
> > But even though I upgraded Spring library to 2.7.7:
> https://github.com/opensearch-project/security/blob/main/pom.xml#L496 <
> https://github.com/opensearch-project/security/blob/main/pom.xml#L496> ,
> I got another error:
> >
> > `java.lang.NoClassDefFoundError:
> org/apache/kafka/common/record/BufferSupplier`
> >
> > Any suggestion helping me out this will be highly appreciated!
> >
> > Thanks,
> > Bruce
> >
>