Re: Kafka Streams app process records until certain date

2021-12-08 Thread Matthias J. Sax

Hard to achieve.

I guess a naive approach would be to use a `flatMapTransform()` to 
implement a filter that drops all record that are not in the desired 
time range.


pause() and resume() are not available in Kafka Streams, but only on the 
KafkaConsumer (The Spring docs you cite is also about the consumer, not 
Kafka Streams).



-Matthias

On 11/24/21 11:05 AM, Miguel González wrote:

Hello

For my use case I need to work with a chuck of records, let's say per
month... We have over two years of data... and we are testing if we can
deploy it to production, but we need to test in small batches.

I have built a Kafka Streams app that processes two input topics and output
to one topic.

I would like to process the first two months of data. Is that possible?

- I have tried blocking the consumer thread using .map and comparing the
timestamp on the message and a timestamp I get from another system that
would tell me until what time I should process on the two KStreams I have
but I have noticed.I also increased MAX_POLL_INTERVAL_MS_CONFIG but I have
noticed the messages that are in range do not get processed and sent to the
output topic.
- I have also seen a Spring Cloud library apparently offer a
pause-resume feature.

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.5/reference/html/spring-cloud-stream-binder-kafka.html#_binding_visualization_and_control_in_kafka_streams_binder
- I have also seen that implementing a transformer or processor could
work but in this case the state store would possible less than years of
data. That is something I would like to avoid.


Any help is appreciated.

regards
- Miguel



AccessDeniedException in Kafka on Windows

2021-12-08 Thread de Bruijn, M. (Martijn)
We are upgrading our Spring Boot applications to Spring Boot 2.6.1.  Spring 
Boot 2.6.1 upgrade our Kafka dependency from 2.7.1 to 3.0.0.
After upgrading I'm getting a AccessDeniedException on all my tests using 
@EmbeddedKafka.

Caused by: java.nio.file.AccessDeniedException: 
C:\Temp\spring.kafka.ae934565-33bc-4073-ab06-59ce265490369531329615041832669
 at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89)
 at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
 at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108)
 at 
java.base/sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:121)
 at java.base/java.nio.channels.FileChannel.open(FileChannel.java:298)
 at java.base/java.nio.channels.FileChannel.open(FileChannel.java:357)
 at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:953)
 at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:941)
 at 
kafka.server.BrokerMetadataCheckpoint.liftedTree1$1(BrokerMetadataCheckpoint.scala:214)
 at 
kafka.server.BrokerMetadataCheckpoint.write(BrokerMetadataCheckpoint.scala:204)

I've been searching in the Kafka source code for the cause of the error and it 
seems the Utils.flushDir is causing troubles on Windows.
On several places FileChannel.open(path, StandardOpenOption.READ) is called 
which will always fail on Windows when path is a directory. See: 
https://mail.openjdk.java.net/pipermail/nio-dev/2013-February/002123.html

Another example of failing code on Windows is the 
kafka.log.LogManager.createAndValidateLogDirs method which calls the 
Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent) with a directory 
as parm. This will always fail on Windows.

How can this be solved? (I can't find a related Kafka Jira issue)


More related reports:
https://stackoverflow.com/questions/70182684/failed-to-start-kafka-server-in-window/70183556


Regards,
Martijn de Bruijn
Software engineer


Custom plugin to filter on kafka server side

2021-12-08 Thread Christian Schneider
We share topics between different tenants. Would it be possible to
implement a filtering on kafka
side that allows a consumer to filter a topic for a certain key?
The idea is that this consumer only gets messages with the specified key to
save network bandwidth as well as (possibly) disk io on kafka broker side.

Christian


-- 
-- 
Christian Schneider
http://www.liquid-reality.de

Computer Scientist
http://www.adobe.com


Re: Use case: Per tenant deployments talking to multi tenant kafka cluster

2021-12-08 Thread Christian Schneider
Hi Luke,

thanks for the hints. This helps a lot already.

We already use assign as we manage offsets on the consumer side. Currently
we only have one partition and simply assign a stored offset on partition 0.
For multiple partitions is it the correct behaviour to simply assign to
partition number:offset or do I have to provide offsets for the other
partitions too? I only want to listen to one partition.
You mentioned custom producer partitioner. We currently use a random
consumer group name for each consumer as we want each consumer to receive
all messages of the environment. In this case do we still need a custom
producer partitioner or is it enough to simply assign to the topic like
described above?

Christian

Am Mi., 8. Dez. 2021 um 11:19 Uhr schrieb Luke Chen :

> Hi Christian,
> Answering your question below:
>
> > Let's assume we just have one topic with 10 partitions for simplicity.
> We can now use the environment id as a key for the messages to make sure
> the messages of each environment arrive in order while sharing the load on
> the partitions.
>
> > Now we want each environment to only read the minimal number of messages
> while consuming. Ideally we would like to to only consume its own messages.
> Can we somehow filter to only
> receive messages with a certain key? Can we maybe only listen to a certain
> partition at least?
>
>
> Unfortunately, Kafka doesn't have the feature to filter the messages on
> broker before sending to consumer.
> But for your 2nd question:
> > Can we maybe only listen to a certain partition at least?
>
> Actually, yes. Kafka has a way to just fetch data from a certain partition
> of a topic. You can use Consumer#assign API to achieve that. So, to do
> that, I think you also need to have a custom producer partitioner for your
> purpose. Let's say, in your example, you have 10 partitions, and 10
> environments. Your partitioner should send to the specific partition based
> on the environment ID, ex: env ID 1 -> partition 1, env ID 2 -> partition
> 2 So, in your consumer, you can just assign to the partition containing
> its environment ID.
>
> And for the idea of encrypting the messages to achieve isolation, it's
> interesting! I've never thought about it! :)
>
> Hope it helps.
>
> Thank you.
> Luke
>
>
> On Wed, Dec 8, 2021 at 4:48 PM Christian Schneider <
> ch...@die-schneider.net>
> wrote:
>
> > We have a single tenant application that we deploy to a kubernetes
> cluster
> > in many instances.
> > Every customer has several environments of the application. Each
> > application lives in a separate namespace and should be isolated from
> other
> > applications.
> >
> > We plan to use kafka to communicate inside an environment (between the
> > different pods).
> > As setting up one kafka cluster per such environment is a lot of overhead
> > and cost we would like to just use a single multi tenant kafka cluster.
> >
> > Let's assume we just have one topic with 10 partitions for simplicity.
> > We can now use the environment id as a key for the messages to make sure
> > the messages of each environment arrive in order while sharing the load
> on
> > the partitions.
> >
> > Now we want each environment to only read the minimal number of messages
> > while consuming. Ideally we would like to to only consume its own
> messages.
> > Can we somehow filter to only
> > receive messages with a certain key? Can we maybe only listen to a
> certain
> > partition at least?
> >
> > Additionally we ideally would like to have enforced isolation. So each
> > environment can only see its own messages even if it might receive
> messages
> > of other environments from the same partition.
> > I think in worst case we can make this happen by encrypting the messages
> > but it would be great if we could filter on broker side.
> >
> > Christian
> >
> > --
> > --
> > Christian Schneider
> > http://www.liquid-reality.de
> >
> > Computer Scientist
> > http://www.adobe.com
> >
>


-- 
-- 
Christian Schneider
http://www.liquid-reality.de

Computer Scientist
http://www.adobe.com


Key space behavior when adding new partitions into a kafka topic dynamically

2021-12-08 Thread Mazen Ezzeddine
Dear all,

Kafka supports adding new partition to a topic dynamically. So suppose that 
initially I have a topic T with two partitions P0, P1 and a key space of three 
keys K0, K1, K2. Suppose further that I am using some kind of hash partitioner 
modulo 2 (number of partitions) at the producer that maps (K0) to P0,  and 
(K1,K2) to P1.  Let’s further assume that I have two consumers C0 for P0, and 
C1 for P1. For simplicity, the consumers are doing some basic key-based 
aggregation (stored in some KV.).

After few time suppose that I added a new partition P2 into T. So now my 
producer will automatically (once detects the new number of partitions) uses 
the same hash partitioner modulo 3. so let’ assume now that the mapping 
keys-partitions become (K0, P0), (K1, P1), (K2, P2).
1. in such cases what would happen to K2 values already written into P1 (before 
the addition of the new partition P2). And what if I would like to have all 
same key records mapped to the same partition always, even when a new topic 
partition is added. That is, I do not want that my K2 records to be spread 
between partitions P1 and P2. Does Kafka framework provide such guarantee in 
some way? or the application must handle such requirement?

2. When the new partition P2 is added, and the mapping keys-partitions become 
(K0, P0), (K1, P1), (K2, P2), a rebalancing process will be triggered. As 
result of rebalancing, let’s suppose that P2 is assigned to C0. So how could C0 
get the most recent aggregated value for K2 out of the KV store in C1 instead 
of restarting from a scratch state. In such scenario, is state reshuffling to 
appropriate consumers guaranteed by kafka? or it should be handled by the 
application if any?

Thank you.



Re: Use case: Per tenant deployments talking to multi tenant kafka cluster

2021-12-08 Thread Luke Chen
Hi Christian,
Answering your question below:

> Let's assume we just have one topic with 10 partitions for simplicity.
We can now use the environment id as a key for the messages to make sure
the messages of each environment arrive in order while sharing the load on
the partitions.

> Now we want each environment to only read the minimal number of messages
while consuming. Ideally we would like to to only consume its own messages.
Can we somehow filter to only
receive messages with a certain key? Can we maybe only listen to a certain
partition at least?


Unfortunately, Kafka doesn't have the feature to filter the messages on
broker before sending to consumer.
But for your 2nd question:
> Can we maybe only listen to a certain partition at least?

Actually, yes. Kafka has a way to just fetch data from a certain partition
of a topic. You can use Consumer#assign API to achieve that. So, to do
that, I think you also need to have a custom producer partitioner for your
purpose. Let's say, in your example, you have 10 partitions, and 10
environments. Your partitioner should send to the specific partition based
on the environment ID, ex: env ID 1 -> partition 1, env ID 2 -> partition
2 So, in your consumer, you can just assign to the partition containing
its environment ID.

And for the idea of encrypting the messages to achieve isolation, it's
interesting! I've never thought about it! :)

Hope it helps.

Thank you.
Luke


On Wed, Dec 8, 2021 at 4:48 PM Christian Schneider 
wrote:

> We have a single tenant application that we deploy to a kubernetes cluster
> in many instances.
> Every customer has several environments of the application. Each
> application lives in a separate namespace and should be isolated from other
> applications.
>
> We plan to use kafka to communicate inside an environment (between the
> different pods).
> As setting up one kafka cluster per such environment is a lot of overhead
> and cost we would like to just use a single multi tenant kafka cluster.
>
> Let's assume we just have one topic with 10 partitions for simplicity.
> We can now use the environment id as a key for the messages to make sure
> the messages of each environment arrive in order while sharing the load on
> the partitions.
>
> Now we want each environment to only read the minimal number of messages
> while consuming. Ideally we would like to to only consume its own messages.
> Can we somehow filter to only
> receive messages with a certain key? Can we maybe only listen to a certain
> partition at least?
>
> Additionally we ideally would like to have enforced isolation. So each
> environment can only see its own messages even if it might receive messages
> of other environments from the same partition.
> I think in worst case we can make this happen by encrypting the messages
> but it would be great if we could filter on broker side.
>
> Christian
>
> --
> --
> Christian Schneider
> http://www.liquid-reality.de
>
> Computer Scientist
> http://www.adobe.com
>


Use case: Per tenant deployments talking to multi tenant kafka cluster

2021-12-08 Thread Christian Schneider
We have a single tenant application that we deploy to a kubernetes cluster
in many instances.
Every customer has several environments of the application. Each
application lives in a separate namespace and should be isolated from other
applications.

We plan to use kafka to communicate inside an environment (between the
different pods).
As setting up one kafka cluster per such environment is a lot of overhead
and cost we would like to just use a single multi tenant kafka cluster.

Let's assume we just have one topic with 10 partitions for simplicity.
We can now use the environment id as a key for the messages to make sure
the messages of each environment arrive in order while sharing the load on
the partitions.

Now we want each environment to only read the minimal number of messages
while consuming. Ideally we would like to to only consume its own messages.
Can we somehow filter to only
receive messages with a certain key? Can we maybe only listen to a certain
partition at least?

Additionally we ideally would like to have enforced isolation. So each
environment can only see its own messages even if it might receive messages
of other environments from the same partition.
I think in worst case we can make this happen by encrypting the messages
but it would be great if we could filter on broker side.

Christian

-- 
-- 
Christian Schneider
http://www.liquid-reality.de

Computer Scientist
http://www.adobe.com