Azkarra Streams v0.8 is now available!

2020-09-25 Thread Florian Hussonnois
Hi everyone,

I'm pleased to announced the released of Azkarra Streams 0.8 (
https://www.azkarrastreams.io/)

Azkarra is an open source micro-framework that takes your Kafka
Streams apps to the next level!

The project is dedicated to making development of streaming microservices
based on Apache Kafka simple and fast.

To get a high-level overview of some of the most exciting changes in this
release : What's new in Azkarra Streams v0.8 ?
<https://medium.com/streamthoughts/whats-new-in-azkarra-streams-0-8-3ed1a4ab803d?source=friends_link=b8aa22a671ba815f8ead394a4acd3a56>

Please feel free to join the Azkarra community on Slack
<https://communityinviter.com/apps/azkarra-streams/azkarra-streams-community>
for questions or comments about the project.

-- 
Florian HUSSONNOIS
Co-founder StreamThoughts.io | Senior Software Engineer
Confluent Community Catalyst


AzkarraStreams 0.7.0 released

2020-05-13 Thread Florian Hussonnois
Hi everyone,

Yesterday, StreamThoughts.io released Azkarra Streams 0.7.0 (
https://www.azkarrastreams.io/)

For those not familiar, Azkarra Streams is an open-source project dedicated
to making the development of streaming microservices, based on Kafka
Streams, simple and fast.

To get a high-level overview of some of the most exciting changes in this
release:
https://medium.com/streamthoughts/introducing-azkarra-streams-0-7-4e521c8223ad

Please feel free to join the Azkarra community on Slack for question or
comments about the project.

-- 
Florian HUSSONNOIS
Co-founder streamthoughts | Senior Software Engineer


Yet another CLI to manage Kafka Connect/SchemaRegistry.

2017-04-24 Thread Florian Hussonnois
Hi folks,

I would like to share with you a CLI that I have developed for a project
needs - https://github.com/fhussonnois/kafkacli

We use it to executes common tasks on Kafka Connect and the SchemeRegistry
during the developments.

-- 
Florian HUSSONNOIS


Re: [DISCUSS] KIP 130: Expose states of active tasks to KafkaStreams public API

2017-03-28 Thread Florian Hussonnois
Hi all,

I've updated the KIP and the PR to reflect your suggestions.
https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
https://github.com/apache/kafka/pull/2612

Also, I've exposed property StreamThread#state as a string through the new
class ThreadMetadata.

Thanks,

2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com>:

> Hi Guozhang, Matthias,
>
> It's a great idea to add sub topologies descriptions. This would help
> developers to better understand topology concept.
>
> I agree that is not really user-friendly to check if 
> `StreamsMetadata#streamThreads`
> is not returning null.
>
> The method name localThreadsMetadata looks good. In addition, it's more
> simple to build ThreadMetadata instances from the `StreamTask` class than
> from `StreamPartitionAssignor` class.
>
> I will work on modifications. As I understand, I have to add the property
> subTopologyId property to the TaskMetadata class - Am I right ?
>
> Thanks,
>
> 2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com>:
>
>> Re 1): this is a good point. May be we can move
>> `StreamsMetadata#streamThreads` as `KafkaStreams#localThreadsMetadata`?
>>
>> 3): this is a minor suggestion about function name of
>> `assignedPartitions`, to `topicPartitions` to be consistent with
>> `StreamsMetadata`?
>>
>>
>> Guozhang
>>
>> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Thanks for the progress on this KIP. I think we are on the right path!
>>>
>>> Couple of comments/questions:
>>>
>>> (1) Why do we not consider the "rejected alternative" to add the method
>>> to KafkaStreams? The comment on #streamThreads() says:
>>>
>>> "Note this method will return null if called on {@link
>>> StreamsMetadata} which represent a remote application."
>>>
>>> Thus, if we cannot get any remote metadata, it seems not straight
>>> forward to not add it to KafkaStreams directly -- this would avoid
>>> invalid calls and `null` return value in the first place.
>>>
>>> I like the idea about exposing sub-topologies.:
>>>
>>> (2a) I would recommend to rename `topicsGroupId` to `subTopologyId` :)
>>>
>>> (2b) We could add this to KIP-120 already. However, I would not just
>>> link both via name, but leverage KIP-120 directly, and add a
>>> "Subtopology" member to the TaskMetadata class.
>>>
>>>
>>> Overall, I like the distinction of KIP-120 only exposing "static"
>>> information that can be determined before the topology get's started,
>>> while this KIP allow to access runtime information.
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 3/22/17 12:42 PM, Guozhang Wang wrote:
>>> > Thanks for the updated KIP, and sorry for the late replies!
>>> >
>>> > I think a little bit more about KIP-130, and I feel that if we are
>>> going
>>> > to deprecate the `toString` function (it is not explicitly said in the
>>> > KIP, so I'm not sure if you plan to still keep the
>>> > `KafkaStreams#toString` as is or are going to replace it with the
>>> > proposed APIs) with the proposed ones, it may be okay. More
>>> > specifically, after both KIP-120 and KIP-130:
>>> >
>>> > 1. users can use `#describe` function to check the generated topology
>>> > before calling `KafkaStreams#start`, which is static information.
>>> > 2. users can use the `StreamsMetadata -> ThreadMetadata ->
>>> TaskMetadata`
>>> > programmatically after called `KafkaStreams#start` to get the
>>> > dynamically changeable information.
>>> >
>>> > One thing I'm still not sure though, is that in `TaskMetadata` we only
>>> > have the TaskId and assigned partitions, whereas in
>>> > "TopologyDescription" introduced in KIP-120, it will simply describe
>>> the
>>> > whole topology possibly composed of multiple sub-topologies. So it is
>>> > hard for users to tell which sub-topology is executed under which task
>>> > on-the-fly.
>>> >
>>> > Hence I'm thinking if we can expose the "sub-topology-id" (named as
>>> > topicsGroupId internally) in TopologyDescription#Subtopology, and then
>>> > from the task id which is essentially "sub-topology-id DASH
>>> 

Re: [DISCUSS] KIP 130: Expose states of active tasks to KafkaStreams public API

2017-03-27 Thread Florian Hussonnois
Hi Guozhang, Matthias,

It's a great idea to add sub topologies descriptions. This would help
developers to better understand topology concept.

I agree that is not really user-friendly to check if
`StreamsMetadata#streamThreads` is not returning null.

The method name localThreadsMetadata looks good. In addition, it's more
simple to build ThreadMetadata instances from the `StreamTask` class than
from `StreamPartitionAssignor` class.

I will work on modifications. As I understand, I have to add the property
subTopologyId property to the TaskMetadata class - Am I right ?

Thanks,

2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com>:

> Re 1): this is a good point. May be we can move 
> `StreamsMetadata#streamThreads`
> as `KafkaStreams#localThreadsMetadata`?
>
> 3): this is a minor suggestion about function name of
> `assignedPartitions`, to `topicPartitions` to be consistent with
> `StreamsMetadata`?
>
>
> Guozhang
>
> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> Thanks for the progress on this KIP. I think we are on the right path!
>>
>> Couple of comments/questions:
>>
>> (1) Why do we not consider the "rejected alternative" to add the method
>> to KafkaStreams? The comment on #streamThreads() says:
>>
>> "Note this method will return null if called on {@link
>> StreamsMetadata} which represent a remote application."
>>
>> Thus, if we cannot get any remote metadata, it seems not straight
>> forward to not add it to KafkaStreams directly -- this would avoid
>> invalid calls and `null` return value in the first place.
>>
>> I like the idea about exposing sub-topologies.:
>>
>> (2a) I would recommend to rename `topicsGroupId` to `subTopologyId` :)
>>
>> (2b) We could add this to KIP-120 already. However, I would not just
>> link both via name, but leverage KIP-120 directly, and add a
>> "Subtopology" member to the TaskMetadata class.
>>
>>
>> Overall, I like the distinction of KIP-120 only exposing "static"
>> information that can be determined before the topology get's started,
>> while this KIP allow to access runtime information.
>>
>>
>>
>> -Matthias
>>
>>
>> On 3/22/17 12:42 PM, Guozhang Wang wrote:
>> > Thanks for the updated KIP, and sorry for the late replies!
>> >
>> > I think a little bit more about KIP-130, and I feel that if we are going
>> > to deprecate the `toString` function (it is not explicitly said in the
>> > KIP, so I'm not sure if you plan to still keep the
>> > `KafkaStreams#toString` as is or are going to replace it with the
>> > proposed APIs) with the proposed ones, it may be okay. More
>> > specifically, after both KIP-120 and KIP-130:
>> >
>> > 1. users can use `#describe` function to check the generated topology
>> > before calling `KafkaStreams#start`, which is static information.
>> > 2. users can use the `StreamsMetadata -> ThreadMetadata -> TaskMetadata`
>> > programmatically after called `KafkaStreams#start` to get the
>> > dynamically changeable information.
>> >
>> > One thing I'm still not sure though, is that in `TaskMetadata` we only
>> > have the TaskId and assigned partitions, whereas in
>> > "TopologyDescription" introduced in KIP-120, it will simply describe the
>> > whole topology possibly composed of multiple sub-topologies. So it is
>> > hard for users to tell which sub-topology is executed under which task
>> > on-the-fly.
>> >
>> > Hence I'm thinking if we can expose the "sub-topology-id" (named as
>> > topicsGroupId internally) in TopologyDescription#Subtopology, and then
>> > from the task id which is essentially "sub-topology-id DASH
>> > partition-group-id" users can make the link, though it is still not that
>> > straight-forward.
>> >
>> > Thoughts?
>> >
>> > Guozhang
>> >
>> >
>> >
>> > On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois
>> > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>> wrote:
>> >
>> > Thanks Guozhang for pointing me to the KIP-120.
>> >
>> > I've made some modifications to the KIP. I also proposed a new PR
>> > (there is
>> > still some tests to make).
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%
>> 3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>> > <https://cwiki.apache.o

Brokers / Best practices to set log.flush.interval.*

2016-09-04 Thread Florian Hussonnois
Hi all,

I would like to know how to configure the following paramaters :

log.flush.interval.messages
log.flush.interval.ms
log.flush.scheduler.interval.ms

The Kafka 0.8.X documentation indicates it is not recommanded to set these
parameters as this can have major impact on performance.

But since Kafka 0.9.x, 0.10.x this cautions are not any more indicated.

In addition, the linkedin's configuration which is given on the Apache
Kafka website is :
log.flush.interval.ms=1
log.flush.interval.messages=2
log.flush.scheduler.interval.ms=2000

When do we need to set these parameters ? What can be the impact if we use
the default setting with a large memory (page-cache) ?

Thanks,

-- 
Florian


Re: Contribution : KafkaStreams CEP library

2016-07-11 Thread Florian Hussonnois
Hi,

It would be very great if you can link my repo. Thank very much.

2016-07-11 18:26 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:

> Thanks Florian!
>
>
> Do you mind if I link your repo in the Kafka Streams eco-system page?
>
>
> Guozhang
>
> On Mon, Jul 11, 2016 at 1:37 AM, Michael Noll <mich...@confluent.io>
> wrote:
>
> > Thanks for sharing, Florian!
> >
> > -Michael
> >
> >
> > On Fri, Jul 8, 2016 at 6:48 PM, Florian Hussonnois <
> fhussonn...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > Since a few weeks I'm working for fun on a CEP library on top of
> > > KafkaStreams.
> > > There is some progress and I think my project start to look something,
> or
> > > at least I hope ;)
> > >
> > > https://github.com/fhussonnois/kafkastreams-cep
> > >
> > > So I'm pleased to share it with you (I already shared it with dev
> mailing
> > > list but I just realised that I've forgotten to add the user list ^^ ).
> > >
> > > Currently, I'm looking to test my library against real use-cases. If
> some
> > > of you test it please I would appreciate any feedback.
> > >
> > > Any contribution is welcome. I'm sure this project can be improved in
> > many
> > > ways.
> > >
> > > Thank in advance,
> > >
> > > --
> > > Florian HUSSONNOIS
> > >
> >
>
>
>
> --
> -- Guozhang
>



-- 
Florian HUSSONNOIS


Contribution : KafkaStreams CEP library

2016-07-08 Thread Florian Hussonnois
Hi All,

Since a few weeks I'm working for fun on a CEP library on top of
KafkaStreams.
There is some progress and I think my project start to look something, or
at least I hope ;)

https://github.com/fhussonnois/kafkastreams-cep

So I'm pleased to share it with you (I already shared it with dev mailing
list but I just realised that I've forgotten to add the user list ^^ ).

Currently, I'm looking to test my library against real use-cases. If some
of you test it please I would appreciate any feedback.

Any contribution is welcome. I'm sure this project can be improved in many
ways.

Thank in advance,

-- 
Florian HUSSONNOIS


How to gracefully shutdown Kafka Connector

2016-06-14 Thread Florian Hussonnois
Hi everyone,

I would like to know if there is a way to shutdown a connector
programmatically  ?

On my project we have developped a sink-connector to write messages into
GZIP files for testing purposes. We would like to stop the connector after
no message is received for an elapsed time

Thanks,

-- 
Florian HUSSONNOIS


KafkaStreams / Processor API - How to retrieve offsets

2016-05-22 Thread Florian Hussonnois
Hi everyone,

Is there any way to retrieve the offset attached to each message with the
APIs of kafka streams ? Currently the Processor interface exposes only the
method process(K key, V value);

It would be usefull to get the offsets in order to implement DSL extensions
with some methods which need to identify each message.

Thanks in advance

-- 
Florian HUSSONNOIS


Re: Consumer Client - How to simulate heartbeats ?

2016-04-18 Thread Florian Hussonnois
Yes, but the ConsumerRebalanceListener is optional and by the default
KafkaConsumer uses a NoOpConsumerRebalanceListener if no one is provided.

I think the seek() is already done internally when a consumer joins or
quits the group. I'm not sure this line is actually needed.

2016-04-18 15:31 GMT+02:00 Kamal C <kamaltar...@gmail.com>:

> When a new consumer joins to the group, it should start to read data
> from where the other consumer left.
>
> --Kamal
>
> On Mon, Apr 18, 2016 at 6:58 PM, Florian Hussonnois <fhussonn...@gmail.com
> >
> wrote:
>
> > Thank you very much, the example is really helpful.
> >
> > My last question is : Why is it necessay to seek the consumer offsets
> into
> > the onPartitionsAssigned method ?
> >
> >
> >
> https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java#L120
> >
> > 2016-04-15 15:06 GMT+02:00 Kamal C <kamaltar...@gmail.com>:
> >
> > > Hi Florian,
> > >
> > > This may be helpful
> > >
> > >
> >
> https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java
> > >
> > > --Kamal
> > >
> > > On Fri, Apr 15, 2016 at 2:57 AM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hi Florian,
> > > >
> > > > It's actually OK if processing takes longer than the heartbeat
> > interval,
> > > > but it does need to finish before the session timeout expires or the
> > > > consumer will be kicked out of the group (which typically is revealed
> > by
> > > > commit failures). If the problem is just that the consumer is
> handling
> > > too
> > > > many messages at once, then Kafka 0.10 has an option to tune the
> number
> > > of
> > > > messages returned from poll() (max.poll.records), which may be
> helpful.
> > > We
> > > > also have a pause/resume API which allows you to call poll() without
> > > > consuming any data. That's the best option at the moment for 0.9
> > > consumers.
> > > >
> > > > For what it's worth, we've considered several times adding a
> > heartbeat()
> > > > API, but the challenge is figuring out how to handle rebalancing.
> > > > Underneath the covers, we use heartbeats to find out when the group
> is
> > > > rebalancing, so a heartbeat() option would probably have to return a
> > flag
> > > > indicating whether a rebalance was needed. If the group has begun
> > > > rebalancing, then you would need to call poll() before the expiration
> > of
> > > > the session timeout so that the consumer can join the rebalance.
> > > > Alternatively, we could let heartbeat() complete the rebalance
> itself,
> > > but
> > > > then you'd have to be prepared to abort processing from the rebalance
> > > > callback. That's not really different from calling poll() after
> pausing
> > > > partitions though. The main problem in any case is that once a
> > rebalance
> > > > begins, you have the duration of the session timeout to stop
> processing
> > > and
> > > > join the rebalance. We're seeing this problem pop up pretty much
> > > everywhere
> > > > that the consumer is used, so we're trying to think of some better
> > > options
> > > > to handle it.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > > On Thu, Apr 14, 2016 at 12:32 PM, Florian Hussonnois <
> > > > fhussonn...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > I have a use case where a message can take longer than '
> > > > > heartbeat.interval.ms' to be processed by my application. As I
> > > > understand
> > > > > the heartbeats of consumer are done while the poll method is
> invoked.
> > > > >
> > > > > I would like to instantiate a worker thread to process the messages
> > > but I
> > > > > need to wait for the messages completion before polling again.
> > > > >
> > > > > Is there a way to force the consumer to make an heartbeat without
> > > polling
> > > > > new messages ?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > --
> > > > > Florian HUSSONNOIS
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > Florian HUSSONNOIS
> >
>



-- 
Florian HUSSONNOIS


Re: Consumer Client - How to simulate heartbeats ?

2016-04-18 Thread Florian Hussonnois
Thank you very much, the example is really helpful.

My last question is : Why is it necessay to seek the consumer offsets into
the onPartitionsAssigned method ?

https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java#L120

2016-04-15 15:06 GMT+02:00 Kamal C <kamaltar...@gmail.com>:

> Hi Florian,
>
> This may be helpful
>
> https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java
>
> --Kamal
>
> On Fri, Apr 15, 2016 at 2:57 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hi Florian,
> >
> > It's actually OK if processing takes longer than the heartbeat interval,
> > but it does need to finish before the session timeout expires or the
> > consumer will be kicked out of the group (which typically is revealed by
> > commit failures). If the problem is just that the consumer is handling
> too
> > many messages at once, then Kafka 0.10 has an option to tune the number
> of
> > messages returned from poll() (max.poll.records), which may be helpful.
> We
> > also have a pause/resume API which allows you to call poll() without
> > consuming any data. That's the best option at the moment for 0.9
> consumers.
> >
> > For what it's worth, we've considered several times adding a heartbeat()
> > API, but the challenge is figuring out how to handle rebalancing.
> > Underneath the covers, we use heartbeats to find out when the group is
> > rebalancing, so a heartbeat() option would probably have to return a flag
> > indicating whether a rebalance was needed. If the group has begun
> > rebalancing, then you would need to call poll() before the expiration of
> > the session timeout so that the consumer can join the rebalance.
> > Alternatively, we could let heartbeat() complete the rebalance itself,
> but
> > then you'd have to be prepared to abort processing from the rebalance
> > callback. That's not really different from calling poll() after pausing
> > partitions though. The main problem in any case is that once a rebalance
> > begins, you have the duration of the session timeout to stop processing
> and
> > join the rebalance. We're seeing this problem pop up pretty much
> everywhere
> > that the consumer is used, so we're trying to think of some better
> options
> > to handle it.
> >
> > Thanks,
> > Jason
> >
> >
> > On Thu, Apr 14, 2016 at 12:32 PM, Florian Hussonnois <
> > fhussonn...@gmail.com>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I have a use case where a message can take longer than '
> > > heartbeat.interval.ms' to be processed by my application. As I
> > understand
> > > the heartbeats of consumer are done while the poll method is invoked.
> > >
> > > I would like to instantiate a worker thread to process the messages
> but I
> > > need to wait for the messages completion before polling again.
> > >
> > > Is there a way to force the consumer to make an heartbeat without
> polling
> > > new messages ?
> > >
> > > Thanks,
> > >
> > > --
> > > Florian HUSSONNOIS
> > >
> >
>



-- 
Florian HUSSONNOIS


Consumer Client - How to simulate heartbeats ?

2016-04-14 Thread Florian Hussonnois
Hi everyone,

I have a use case where a message can take longer than '
heartbeat.interval.ms' to be processed by my application. As I understand
the heartbeats of consumer are done while the poll method is invoked.

I would like to instantiate a worker thread to process the messages but I
need to wait for the messages completion before polling again.

Is there a way to force the consumer to make an heartbeat without polling
new messages ?

Thanks,

-- 
Florian HUSSONNOIS


How to retrieve the HighWaterMark

2016-02-10 Thread Florian Hussonnois
Hi all,

I'm looking for a way to retrieve the HighWaterMark using the new API.

Is that possible ?

Thank you in advance

-- 
Florian HUSSONNOIS


how to set the Java heap of brokers

2016-01-25 Thread Florian Hussonnois
Hi,

I'm looking for any recommendations on how to set the Java heap of brokers.

I have found details on GC configuration but 6g looks very high fo me.

-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

I would like to understand what brokers actually maintain in-memory.

Is anybody have some links which may explain that point ?

Thank you in advance.

-- 
Florian HUSSONNOIS