Re: How auto.offset.reset = latest works

2019-10-06 Thread Matthias J. Sax
You can get the assigned partitions via `Consumer#assignment()`.

And yes, if `auto.offset.reset` triggers and it set to "latest" the
consumer will get the end-offset for each assigned partition and start
reading from there.


-Matthias

On 10/3/19 12:29 PM, Hrishikesh Mishra wrote:
> Hi,
> 
> I want to understand how does *auto.offset.reset = latest *work. When
> consumer first call poll() method, will it assign the current offsets to
> consumer for all partition (when single consumer is up in a consumer
> group)? How do I know all partitions are assigned to a consumer?
> 
> 
> Regards
> Hrishikesh
> 



signature.asc
Description: OpenPGP digital signature


Re: [VOTE] 2.3.1 RC1

2019-10-06 Thread David Arthur
Passing builds:
Unit/integration tests https://builds.apache.org/job/kafka-2.3-jdk8/122/
System tests https://jenkins.confluent.io/job/system-test-kafka/job/2.3/142/


On Fri, Oct 4, 2019 at 9:52 PM David Arthur  wrote:

> Hello all, we identified a few bugs and a dependency update we wanted to
> get fixed for 2.3.1. In particular, there was a problem with rolling
> upgrades of streams applications (KAFKA-8649).
>
> Check out the release notes for a complete list.
> https://home.apache.org/~davidarthur/kafka-2.3.1-rc1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Wednesday October 9th, 9pm PST
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~davidarthur/kafka-2.3.1-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~davidarthur/kafka-2.3.1-rc1/javadoc/
>
> * Tag to be voted upon (off 2.3 branch) is the 2.3.1 tag:
> https://github.com/apache/kafka/releases/tag/2.3.1-rc1
>
> * Documentation:
> https://kafka.apache.org/23/documentation.html
>
> * Protocol:
> https://kafka.apache.org/23/protocol.html
>
> * Successful Jenkins builds for the 2.3 branch are TBD but will be located:
>
> Unit/integration tests: https://builds.apache.org/job/kafka-2.3-jdk8/
>
> System tests: https://jenkins.confluent.io/job/system-test-kafka/job/2.3/
>
>
> Thanks!
> David Arthur
>


-- 
David Arthur


Re: Stream branching and states

2019-10-06 Thread Sophie Blee-Goldman
Hi Navneeth,

I think your plan to implement branching with the Processor API sounds
reasonable, but I'm
wondering why not just use the DSL branch operator and connect some
transformers? That
will let you mix the DSL and PAPI so you don't have to re-implement
anything that is already
in the DSL.

If you supplied a rocksdb store supplier, then yes the state will be held
in rocksdb and not in
memory. Note that this also the default, ie if you don't pass in any
supplier it will default to
using rocksdb. The logging option is, as you describe, to provide fault
tolerance and rebuild
the existing state from the changelog in case your app goes down. This will
also be used in
case you add another instance to scale out -- in that case the global state
will need to be rebuilt,
and any non-global state stores that are assigned to the new instance to
balance the cluster
will also need to be built. You almost definitely want to enable logging,
it will still contain only
the current value vs potentially containing nothing at all if you scale out
or instance restarts.
Global stores technically do not enable logging since it is unnecessary,
they just read from an
input topic so they can just re-read from that same topic rather than a
changelog which would
duplicate the same data.

The caching option differs slightly depending on whether you're using it in
the DSL or PAPI, but you
can read about it here:
https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt.html
Basically it adds an in-memory cache above the state stores to reduce disk
reads/writes. In the DSL,
it also reduces downstream traffic by buffering records with the same key.

On Fri, Oct 4, 2019 at 11:37 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> I'm waiting for inputs to proceed with my POC and convert existing
> applications from flink to kafka streams. It would be really helpful if I
> can get some clarity on this.
>
> Thanks
>
> On Wed, Oct 2, 2019 at 7:05 PM Navneeth Krishnan  >
> wrote:
>
> > Hello All,
> >
> > Any inputs?
> >
> > Thanks
> >
> > On Tue, Oct 1, 2019 at 12:40 PM Navneeth Krishnan <
> > reachnavnee...@gmail.com> wrote:
> >
> >> Hi All,
> >>
> >> I found the below example on how branching can be achieved with kafka
> >> streams. I want to implement the same with processor API and the way I
> get
> >> it is to have the same input topic  processed with multiple process
> >> functions and output the data based on the filters. Is this the right
> >> understanding?
> >>
> >> final StreamsBuilder builder = new StreamsBuilder();
> >> KStream source_o365_user_activity =
> builder.stream("o365_user_activity");KStream[] branches =
> source_o365_user_activity.branch(
> >>   (key, value) -> (value.contains("Operation\":\"SharingSet") &&
> value.contains("ItemType\":\"File")),
> >>   (key, value) ->
> (value.contains("Operation\":\"AddedToSecureLink") &&
> value.contains("ItemType\":\"File")),
> >>   (key, value) -> true
> >>  );
> >>
> >> branches[0].to("o365_sharing_set_by_date");
> >> branches[1].to("o365_added_to_secure_link_by_date");
> >> branches[2].to("o365_user_activity_by_date");
> >>
> >>
> >> Also is the global state kept in memory or on rocksdb? I'm providing a
> >> state store supplier with rocksdb as the option.
> >>
> >> I believe the logging option is to rebuild the state from start when the
> >> application goes down. For my scenario I just need the current value, in
> >> that case should logging be enabled and for global state store it says
> >> logging cannot be enabled. Also what is the caching option? The
> >> documentation doesn't have enough details on these.
> >>
> >> StoreBuilder> globalState =
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("test"),
> Serdes.String(), Serdes.String())
> >> .withLoggingDisabled()
> >> .withCachingEnabled();
> >>
> >>
> >> One last question, during auto scaling if I bring up a new node I would
> >> assume some of the traffic will be reallocated to the new node. What
> will
> >> happen to the rocksdb state that was accumulated in the already running
> >> node?
> >>
> >> Thanks, appreciate all the help.
> >>
> >>
>