Hi Sachin
Here is a possible general approach which might work for you in abscence of
any broadcast variable and everything being local state and also if you can
adjust you process to do everything before doing aggregation.
Basically the idea here is to use a custom topology with custom processor
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512
There is no build it support for broadcast variables. Loops are also
not supported -- the dataflow graph must be a DAG.
You could use an additional topic, plus extra (user instantiated)
producer and consumer (using manual topic partition assigment!)
Hi,
On the subject of deleting values from list I have following toplogy
aggregate(old list) = new list -> left join (another list) -> output sink.
While processing the output list I know which values in the list are old
and can be removed.
Is there any way to pass that information from downstream
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512
Sachin,
my commend about deleting was about deleting values from the list, not
about deleting the whole key/value record.
If you want to delete a whole key/value record it there is not update
for it for some time, you can combine compaction with re
Hi Sachin,
You can achieve what you want by setting the correct cleanup.policy on
these topics.
In this case you want cleanup.policy=compact,delete - you'll also want to
set retention.ms and/or retention.bytes.
The topic will then be compacted, but it will also delete any segments
based on the re
Hi,
As per Eno suggestion I have pre-created internal changelog topics with
increased max.message.bytes config to handle big messages that gets
incremented over the time.
As Matthias has pointed that we cannot use retention.ms setting to delete
older message data after a given time, is there a way
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512
My two cents:
Changelog topics are compacted topics, thus they do not have a
retention time (there is an exception for windowed KTable changlog
topics that are compacted and do have a retention time though).
However, I do not understand how changin
Hi Sachin,
One option right now would be to precreate all internal topics in Kafka, and
only after that start the Kafka Streams application. This would require you
knowing the internal name of the topics (in this case you probably already know
it, but I agree that in general this is a bit cumbe
Per message payload size. The basic question is how can I control the
internal change log topics parameters so as to avoid these errors.
On Tue, Nov 8, 2016 at 11:37 PM, R Krishna wrote:
> Are you talking about total messages and therefore size or per message
> payload size.
>
> On Tue, Nov 8,
Are you talking about total messages and therefore size or per message
payload size.
On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal wrote:
> Message size itself increases over the time.
>
> Message is something like
> key=[list on objects]
>
> This increases with time and then at a point kafka i
Message size itself increases over the time.
Message is something like
key=[list on objects]
This increases with time and then at a point kafka is not able to add any
message to its topic because message size is greater than max.message.bytes.
Since this is an internal topic based off a table I d
Hi Sachin,
Could you clarify what you mean by "message size increases"? Are messages going
to the changelog topic increasing in size? Or is the changelog topic getting
full?
Thanks
Eno
> On 8 Nov 2016, at 16:49, Sachin Mittal wrote:
>
> Hi,
> We are using aggregation by key on a kstream to
Hi,
We are using aggregation by key on a kstream to create a ktable.
As I read from
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams%3A+Internal+Data+Management
it creates an internal changelog topic.
However over the time the streaming application is run message size
increases and
13 matches
Mail list logo