RE: Kafka Connect: Increase Consumer Consumption

2018-07-18 Thread adrien ruffie
Strange enough ...

I don't really understand why.


Do you have multiple consumer ?

I don't know if a metric which gives the external consumption of a topic.

But if even simple metric exist, you can check if it's your consumer which not 
really receive more than 1150 records,

and your topic gets out more than 5000 records.


Adrien


De : Vishnu Manivannan 
Envoyé : jeudi 19 juillet 2018 00:05:14
À : users@kafka.apache.org
Objet : Re: Kafka Connect: Increase Consumer Consumption

Hi Adrien,

I set fetch.max.wait.ms to 1500 ms and ran it again. It still isn't crossing 
1150 records per fetch. On the producer side (using kakfa-producer-perf-test), 
its producing about 30,000 records/sec and a million records in total.

I tried different configurations amongst these three parameters 
(max.poll.records, fetch.min.bytes, fetch.max.wait.ms). All the other 
parameters are unchanged and have their default values. I have not changed any 
of the broker configs either. I read the Kafka documentation but could not find 
any other parameters that could impact the fetch size.

Is there some other consumer or broker parameter that I might be missing?

Thanks,
Vishnu

On 7/18/18, 1:48 PM, "adrien ruffie"  wrote:

Hi Vishnu,

do you have check your fetch.max.wait.ms value ?

it may not be long enough time to wait until you recover your 5000 records 
...

maybe just enough time to recover only 1150 records.


fetch.max.wait.ms

By setting fetch.min.bytes, you tell Kafka to wait until it has enough data 
to send before responding to the consumer. fetch.max.wait.ms lets you control 
how long to wait. By default, Kafka will wait up to 500 ms. This results in up 
to 500 ms of extra latency in case there is not enough data flowing to the 
Kafka topic to satisfy the minimum amount of data to return. If you want to 
limit the potential latency (usually due to SLAs controlling the maximum 
latency of the application), you can set fetch.max.wait.ms to a lower value. If 
you set fetch.max.wait.ms to 100 ms and fetch.min.bytes to 1 MB, Kafka will 
receive a fetch request from the consumer and will respond with data either 
when it has 1 MB of data to return or after 100 ms, whichever happens first.

Best regards,


Adrien


De : Vishnu Manivannan 
Envoyé : mercredi 18 juillet 2018 21:00:50
À : users@kafka.apache.org
Objet : Kafka Connect: Increase Consumer Consumption

Hi,

I am currently working with a single Kafka broker and a single Kafka 
consumer. I am trying to get the consumer to fetch more records, so I can 
increase the batch size when I write the data to a DB.

Each record is about 1 KB and I am trying to fetch at least 5000 records 
each time. So, I changed the configurations for the following consumer 
parameters:

  *   max.poll.records = 5000
  *   fetch.min.bytes = 512

For some reason, the maximum number of records fetched each time does not 
go above 1150. Are there any other parameters that I should look into or any 
changes I should make to the current configurations?

Thanks,
Vishnu






Re: Kafka Connect: Increase Consumer Consumption

2018-07-18 Thread Vishnu Manivannan
Hi Adrien,

I set fetch.max.wait.ms to 1500 ms and ran it again. It still isn't crossing 
1150 records per fetch. On the producer side (using kakfa-producer-perf-test), 
its producing about 30,000 records/sec and a million records in total.

I tried different configurations amongst these three parameters 
(max.poll.records, fetch.min.bytes, fetch.max.wait.ms). All the other 
parameters are unchanged and have their default values. I have not changed any 
of the broker configs either. I read the Kafka documentation but could not find 
any other parameters that could impact the fetch size. 

Is there some other consumer or broker parameter that I might be missing?

Thanks,
Vishnu

On 7/18/18, 1:48 PM, "adrien ruffie"  wrote:

Hi Vishnu,

do you have check your fetch.max.wait.ms value ?

it may not be long enough time to wait until you recover your 5000 records 
...

maybe just enough time to recover only 1150 records.


fetch.max.wait.ms

By setting fetch.min.bytes, you tell Kafka to wait until it has enough data 
to send before responding to the consumer. fetch.max.wait.ms lets you control 
how long to wait. By default, Kafka will wait up to 500 ms. This results in up 
to 500 ms of extra latency in case there is not enough data flowing to the 
Kafka topic to satisfy the minimum amount of data to return. If you want to 
limit the potential latency (usually due to SLAs controlling the maximum 
latency of the application), you can set fetch.max.wait.ms to a lower value. If 
you set fetch.max.wait.ms to 100 ms and fetch.min.bytes to 1 MB, Kafka will 
receive a fetch request from the consumer and will respond with data either 
when it has 1 MB of data to return or after 100 ms, whichever happens first.

Best regards,


Adrien


De : Vishnu Manivannan 
Envoyé : mercredi 18 juillet 2018 21:00:50
À : users@kafka.apache.org
Objet : Kafka Connect: Increase Consumer Consumption

Hi,

I am currently working with a single Kafka broker and a single Kafka 
consumer. I am trying to get the consumer to fetch more records, so I can 
increase the batch size when I write the data to a DB.

Each record is about 1 KB and I am trying to fetch at least 5000 records 
each time. So, I changed the configurations for the following consumer 
parameters:

  *   max.poll.records = 5000
  *   fetch.min.bytes = 512

For some reason, the maximum number of records fetched each time does not 
go above 1150. Are there any other parameters that I should look into or any 
changes I should make to the current configurations?

Thanks,
Vishnu






RE: Kafka Connect: Increase Consumer Consumption

2018-07-18 Thread adrien ruffie
Hi Vishnu,

do you have check your fetch.max.wait.ms value ?

it may not be long enough time to wait until you recover your 5000 records ...

maybe just enough time to recover only 1150 records.


fetch.max.wait.ms

By setting fetch.min.bytes, you tell Kafka to wait until it has enough data to 
send before responding to the consumer. fetch.max.wait.ms lets you control how 
long to wait. By default, Kafka will wait up to 500 ms. This results in up to 
500 ms of extra latency in case there is not enough data flowing to the Kafka 
topic to satisfy the minimum amount of data to return. If you want to limit the 
potential latency (usually due to SLAs controlling the maximum latency of the 
application), you can set fetch.max.wait.ms to a lower value. If you set 
fetch.max.wait.ms to 100 ms and fetch.min.bytes to 1 MB, Kafka will receive a 
fetch request from the consumer and will respond with data either when it has 1 
MB of data to return or after 100 ms, whichever happens first.

Best regards,


Adrien


De : Vishnu Manivannan 
Envoyé : mercredi 18 juillet 2018 21:00:50
À : users@kafka.apache.org
Objet : Kafka Connect: Increase Consumer Consumption

Hi,

I am currently working with a single Kafka broker and a single Kafka consumer. 
I am trying to get the consumer to fetch more records, so I can increase the 
batch size when I write the data to a DB.

Each record is about 1 KB and I am trying to fetch at least 5000 records each 
time. So, I changed the configurations for the following consumer parameters:

  *   max.poll.records = 5000
  *   fetch.min.bytes = 512

For some reason, the maximum number of records fetched each time does not go 
above 1150. Are there any other parameters that I should look into or any 
changes I should make to the current configurations?

Thanks,
Vishnu




Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

2018-07-18 Thread Vasily Sulatskov
Thank you everyone for your explanations, that's been most enlightening.
On Wed, Jul 18, 2018 at 2:28 AM Matthias J. Sax  wrote:
>
> I see -- sorry for miss-understanding initially.
>
> I agree that it would be possible to detect. Feel free to file a Jira
> for this improvement and maybe pick it up by yourself :)
>
>
> -Matthias
>
> On 7/17/18 3:01 PM, Vasily Sulatskov wrote:
> > Hi,
> >
> > I do understand that in a general case it's not possible to guarantee
> > that newValue and oldValue parts of a Change message arrive to the
> > same partitions, and I guess that's not really in the plans, but if I
> > correctly understand how it works, it should be possible to detect if
> > both newValue and oldValue go to the same partition and keep them
> > together, thus improving kafka-streams consistency guarantees. Right?
> >
> > For example right now I have such a usecase that when I perform
> > groupBy on a table, my new keys are computed purely from old keys, and
> > not from the value. And handling of such cases (not a general case)
> > can be improved.
> > On Tue, Jul 17, 2018 at 1:48 AM Matthias J. Sax  
> > wrote:
> >>
> >> It is not possible to use a single message, because both messages may go
> >> to different partitions and may be processed by different applications
> >> instances.
> >>
> >> Note, that the overall KTable state is sharded. Updating a single
> >> upstream shard, might required to update two different downstream shards.
> >>
> >>
> >> -Matthias
> >>
> >> On 7/16/18 2:50 PM, Vasily Sulatskov wrote:
> >>> Hi,
> >>>
> >>> It seems that it wouldn't be that difficult to address: just don't
> >>> break Change(newVal, oldVal) into Change(newVal, null) /
> >>> Change(oldVal, null) and update aggregator value in one .process()
> >>> call.
> >>>
> >>> Would this change make sense?
> >>> On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax  
> >>> wrote:
> 
>  Vasily,
> 
>  yes, it can happen. As you noticed, both messages might be processed on
>  different machines. Thus, Kafka Streams provides 'eventual consistency'
>  guarantees.
> 
> 
>  -Matthias
> 
>  On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
> > Hi John,
> >
> > Thanks a lot for you explanation. It does make much more sense now.
> >
> > The Jira issue I think is pretty well explained (with a reference to
> > this thread). And I've lest my 2 cents in the pull request.
> >
> > You are right I didn't notice that repartition topic contains the same
> > message effectively twice, and 0/1 bytes are non-visible, so when I
> > used kafka-console-consumer I didn't notice that. So I have a quick
> > suggestion here, wouldn't it make sense to change 0 and 1 bytes to
> > something that has visible corresponding ascii characters, say + and
> > -, as these messages are effectively commands to reducer to execute
> > either an addition or subtraction?
> >
> > On a more serious, side, can you please explain temporal aspects of
> > how change messages are handled? More specifically, is it guaranteed
> > that both Change(newValue, null) and Change(null, oldValue) are
> > handled before a new aggregated value is comitted to an output topic?
> > Change(newValue, null) and Change(null, oldValue) are delivered as two
> > separate messages via a kafka topic, and when they are read from a
> > topic (possibly on a different machine where a commit interval is
> > asynchronous to a machine that's put these changes into a topic) can
> > it happen so a Change(newValue, null) is processed by a
> > KTableReduceProcessor, the value of the aggregator is updated, and
> > committed to the changelog topic, and a Change(null, oldValue) is
> > processed only in the next commit interval? If I am understand this
> > correctly that would mean that in an aggregated table an incorrect
> > aggregated value will be observed briefly, before being eventually
> > corrected.
> >
> > Can that happen? Or I can't see something that would make it impossible?
> > On Fri, Jul 13, 2018 at 8:05 PM John Roesler  wrote:
> >>
> >> Hi Vasily,
> >>
> >> I'm glad you're making me look at this; it's good homework for me!
> >>
> >> This is very non-obvious, but here's what happens:
> >>
> >> KStreamsReduce is a Processor of (K, V) => (K, Change) . I.e., it 
> >> emits
> >> new/old Change pairs as the value.
> >>
> >> Next is the Select (aka GroupBy). In the DSL code, this is the
> >> KTableRepartitionMap (we call it a repartition when you select a new 
> >> key,
> >> since the new keys may belong to different partitions).
> >> KTableRepartitionMap is a processor that does two things:
> >> 1. it maps K => K1 (new keys) and V => V1 (new values)
> >> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
> >> null)]
> >> In other words, it tur

Kafka Connect: Increase Consumer Consumption

2018-07-18 Thread Vishnu Manivannan
Hi,

I am currently working with a single Kafka broker and a single Kafka consumer. 
I am trying to get the consumer to fetch more records, so I can increase the 
batch size when I write the data to a DB.

Each record is about 1 KB and I am trying to fetch at least 5000 records each 
time. So, I changed the configurations for the following consumer parameters:

  *   max.poll.records = 5000
  *   fetch.min.bytes = 512

For some reason, the maximum number of records fetched each time does not go 
above 1150. Are there any other parameters that I should look into or any 
changes I should make to the current configurations?

Thanks,
Vishnu




Re: Kafka Streams: Share state store across processors

2018-07-18 Thread Matthias J. Sax
If you connect both stores to both processor, there will be only one
thread for both processors. Thus, a concurrent access can never happen.


-Matthias

On 7/18/18 10:51 AM, Druhin Sagar Goel wrote:
> Hi Matthias,
> 
> 
> I was under the impression that state stores are not thread safe and so two 
> processors writing to the same store at the same time would not work. I 
> understood this from your reply in this post: 
> https://groups.google.com/forum/#!topic/confluent-platform/JTKyDE231y8. Is 
> that not the case?
> 
> 
> Thanks,
> 
> Druhin
> 
> [http://www.google.com/images/icons/product/groups-128.png]
> 
> Kafka streams with global state store for 
> NUM_STREAM_THREADS_CONFIG=3
> groups.google.com
> Posted 11/13/17 2:50 AM, 8 messages
> 
> 
> 
> From: Matthias J. Sax 
> Sent: Wednesday, July 18, 2018 10:20:02 AM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams: Share state store across processors
> 
> You can connect both stores to both processor for this.
> 
> -Matthias
> 
> On 7/17/18 11:12 PM, Druhin Sagar Goel wrote:
>> Hi,
>>
>> I am new to the Kafka Streams framework. I have the following streams use 
>> case:
>>
>> State store A
>> State store B
>>
>> Processor A
>> Processor B
>>
>> State store A is only written to by Processor A but also needs to be read by 
>> Processor B. State store B needs to be written to by both Processor A and 
>> Processor B. Processor A and Processor B run concurrently.
>>
>> What is the best way for me to be able to share state stores across 
>> processors such that they can be written to by multiple processors?
>>
>>
>> Thanks,
>> Druhin
>>
> 
> 



signature.asc
Description: OpenPGP digital signature


Fwd: Recovering partition leadership outside ISR

2018-07-18 Thread Jack Foy
Hi all,

We had an outage recently that I think we could have prevented, and I'd
like to get some feedback on the idea.

tl;dr:

When a partition leader writes an updated ISR, it should also record
its current log-end-offset. On leader election, if there are no live
replicas in the ISR, then a replica with this same log-end-offset
should be preferred before considering unclean leader election.

Details and use case:

We had a 5-node Kafka 1.0.0 cluster (since upgraded to 1.1.0) with unclean
leader election disabled. Well-configured topics have replication factor 3 and
min.insync.replicas 2, with producers setting acks=all.

At root, our cloud provider suffered hardware failure, causing a partial
outage on network connectivity to disk storage. Broker 5's storage was on the
orphaned side of the network partition.

At the very start of the incident, broker 5 dropped all followers on brokers 1
and 4 out of the ISR for partitions it was leading. Its connections to brokers
2 and 3 and to Zookeeper stayed up, including to the controller on broker 3.
Broker 5 went offline entirely a few moments later, and stayed down with disk
state inaccessible for several hours.

We had configured multiple partitions with broker 5 as their leader and
followers on brokers 1 and 4. Before the incident those partitions had ISR
{5,1,4}, which shrank to {5} before broker 5 disappeared - leaving us with no
eligible replicas to become leader.

The only ways to bring these partitions back up were to either recover broker
5's up-to-date disk state, or to enable unclean leader election. Had we lost
one follower, then the other, and then the leader, enabling unclean leader
election would have carried 50% risk of message loss.

In the end, we decided that the lowest-risk option was to enable unclean leader
election on the affected topics, force a controller election, watch the
partitions come back up, and disable unclean election. We added this
procedure to our runbooks.

I think there's a safer recovery path that Kafka could support:

The leader should also record its current log-end-offset when it writes an
updated ISR. If the controller determines that it can't promote a replica from
the ISR, it should next look for a replica that has that same log-end-offset.
Only if that step also fails should it then consider unclean leader election.

For our failure case, at least, I think this would have allowed a clean and
automatic recovery. Has this idea been considered before? Does it have
fatal flaws?

Thanks,

--
Jack Foy 


Re: Kafka Streams: Share state store across processors

2018-07-18 Thread Druhin Sagar Goel
Hi Matthias,


I was under the impression that state stores are not thread safe and so two 
processors writing to the same store at the same time would not work. I 
understood this from your reply in this post: 
https://groups.google.com/forum/#!topic/confluent-platform/JTKyDE231y8. Is that 
not the case?


Thanks,

Druhin

[http://www.google.com/images/icons/product/groups-128.png]

Kafka streams with global state store for 
NUM_STREAM_THREADS_CONFIG=3
groups.google.com
Posted 11/13/17 2:50 AM, 8 messages



From: Matthias J. Sax 
Sent: Wednesday, July 18, 2018 10:20:02 AM
To: users@kafka.apache.org
Subject: Re: Kafka Streams: Share state store across processors

You can connect both stores to both processor for this.

-Matthias

On 7/17/18 11:12 PM, Druhin Sagar Goel wrote:
> Hi,
>
> I am new to the Kafka Streams framework. I have the following streams use 
> case:
>
> State store A
> State store B
>
> Processor A
> Processor B
>
> State store A is only written to by Processor A but also needs to be read by 
> Processor B. State store B needs to be written to by both Processor A and 
> Processor B. Processor A and Processor B run concurrently.
>
> What is the best way for me to be able to share state stores across 
> processors such that they can be written to by multiple processors?
>
>
> Thanks,
> Druhin
>



Re: [kafka-clients] Re: [VOTE] 1.1.1 RC3

2018-07-18 Thread Matthias J. Sax
Thanks Dong.

I am a little late, but +1, too.

 - verified signatures
 - build from sources
 - run unit test suite
 - run streams quickstart


Thanks for running the release!

-Matthias

On 7/18/18 10:24 AM, Dong Lin wrote:
> Thank you all for taking time to certify and vote for the release!
> 
> This vote has passed with 10 +1 votes (3 bindings) and no 0 or -1 votes.
> 
> 
> +1 vote from PMC Members:
> - Jason Gustafson
> - Rajini Sivaram
> - Ismael Juma
> 
> 
> +1 vote from Committers:
> - Sriharsha Chintalapani
> - Dong Lin
> 
> +1 vote from Community:
> - Ted Yu
> - Satish Duggana
> - Jakub Scholz
> - Brett Rann
> - ManiKumar Reddy
> 
> 0 vote: none
> 
> -1 vote: none
> 
> Vote thread: the thread is not found on http://markmail.org/message yet.
> Will pose it when it is available.
> 
> I'll continue with the release process and the release announcement will
> follow.
> 
> 
> Cheers,
> Dong
> 
> 
> On Wed, Jul 18, 2018 at 7:46 AM, Ismael Juma  > wrote:
> 
> +1 (binding)
> 
> Verified signature of source artifact, ran tests and verified
> quickstart on source artifact with Java 8, verified quickstart on
> binary artifact (Scala 2.12) with Java 8, sanity checked release
> notes and Maven staging repository.
> 
> Thanks for managing the release Dong!
> 
> Ismael
> 
> On Sun, Jul 8, 2018 at 3:36 PM Dong Lin  > wrote:
> 
> Hello Kafka users, developers and client-developers,
> 
> This is the fourth candidate for release of Apache Kafka 1.1.1.
> 
> Apache Kafka 1.1.1 is a bug-fix release for the 1.1 branch that
> was first
> released with 1.1.0 about 3 months ago. We have fixed about 25
> issues since
> that release. A few of the more significant fixes include:
> 
> KAFKA-6925  > - Fix memory
> leak in StreamsMetricsThreadImpl
> KAFKA-6937  > - In-sync
> replica delayed during fetch if replica throttle is exceeded
> KAFKA-6917  > - Process txn
> completion asynchronously to avoid deadlock
> KAFKA-6893  > - Create
> processors before starting acceptor to avoid ArithmeticException
> KAFKA-6870  > -
> Fix ConcurrentModificationException in SampledStat
> KAFKA-6878  > - Fix
> NullPointerException when querying global state store
> KAFKA-6879  > - Invoke
> session init callbacks outside lock to avoid Controller deadlock
> KAFKA-6857  > - Prevent
> follower from truncating to the wrong offset if undefined leader
> epoch is
> requested
> KAFKA-6854  > - Log cleaner
> fails with transaction markers that are deleted during clean
> KAFKA-6747  > - Check
> whether there is in-flight transaction before aborting transaction
> KAFKA-6748  > - Double
> check before scheduling a new task after the punctuate call
> KAFKA-6739  > -
> Fix IllegalArgumentException when down-converting from V2 to V0/V1
> KAFKA-6728  > -
> Fix NullPointerException when instantiating the HeaderConverter
> 
> Kafka 1.1.1 release plan:
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+1.1.1
> 
> 
> Release notes for the 1.1.1 release:
> http://home.apache.org/~lindong/kafka-1.1.1-rc3/RELEASE_NOTES.html
> 

Re: [kafka-clients] Re: [VOTE] 1.1.1 RC3

2018-07-18 Thread Dong Lin
Thank you all for taking time to certify and vote for the release!

This vote has passed with 10 +1 votes (3 bindings) and no 0 or -1 votes.


+1 vote from PMC Members:
- Jason Gustafson
- Rajini Sivaram
- Ismael Juma


+1 vote from Committers:
- Sriharsha Chintalapani
- Dong Lin

+1 vote from Community:
- Ted Yu
- Satish Duggana
- Jakub Scholz
- Brett Rann
- ManiKumar Reddy

0 vote: none

-1 vote: none

Vote thread: the thread is not found on http://markmail.org/message yet.
Will pose it when it is available.

I'll continue with the release process and the release announcement will
follow.


Cheers,
Dong


On Wed, Jul 18, 2018 at 7:46 AM, Ismael Juma  wrote:

> +1 (binding)
>
> Verified signature of source artifact, ran tests and verified quickstart
> on source artifact with Java 8, verified quickstart on binary artifact (Scala
> 2.12) with Java 8, sanity checked release notes and Maven staging
> repository.
>
> Thanks for managing the release Dong!
>
> Ismael
>
> On Sun, Jul 8, 2018 at 3:36 PM Dong Lin  wrote:
>
>> Hello Kafka users, developers and client-developers,
>>
>> This is the fourth candidate for release of Apache Kafka 1.1.1.
>>
>> Apache Kafka 1.1.1 is a bug-fix release for the 1.1 branch that was first
>> released with 1.1.0 about 3 months ago. We have fixed about 25 issues
>> since
>> that release. A few of the more significant fixes include:
>>
>> KAFKA-6925  - Fix
>> memory
>> leak in StreamsMetricsThreadImpl
>> KAFKA-6937  - In-sync
>> replica delayed during fetch if replica throttle is exceeded
>> KAFKA-6917  - Process
>> txn
>> completion asynchronously to avoid deadlock
>> KAFKA-6893  - Create
>> processors before starting acceptor to avoid ArithmeticException
>> KAFKA-6870  -
>> Fix ConcurrentModificationException in SampledStat
>> KAFKA-6878  - Fix
>> NullPointerException when querying global state store
>> KAFKA-6879  - Invoke
>> session init callbacks outside lock to avoid Controller deadlock
>> KAFKA-6857  - Prevent
>> follower from truncating to the wrong offset if undefined leader epoch is
>> requested
>> KAFKA-6854  - Log
>> cleaner
>> fails with transaction markers that are deleted during clean
>> KAFKA-6747  - Check
>> whether there is in-flight transaction before aborting transaction
>> KAFKA-6748  - Double
>> check before scheduling a new task after the punctuate call
>> KAFKA-6739  -
>> Fix IllegalArgumentException when down-converting from V2 to V0/V1
>> KAFKA-6728  -
>> Fix NullPointerException when instantiating the HeaderConverter
>>
>> Kafka 1.1.1 release plan:
>> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+1.1.1
>>
>> Release notes for the 1.1.1 release:
>> http://home.apache.org/~lindong/kafka-1.1.1-rc3/RELEASE_NOTES.html
>>
>> *** Please download, test and vote by Thursday, July 12, 12pm PT ***
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS
>>
>> * Release artifacts to be voted upon (source and binary):
>> http://home.apache.org/~lindong/kafka-1.1.1-rc3/
>>
>> * Maven artifacts to be voted upon:
>> https://repository.apache.org/content/groups/staging/
>>
>> * Javadoc:
>> http://home.apache.org/~lindong/kafka-1.1.1-rc3/javadoc/
>>
>> * Tag to be voted upon (off 1.1 branch) is the 1.1.1-rc3 tag:
>> https://github.com/apache/kafka/tree/1.1.1-rc3
>>
>> * Documentation:
>> http://kafka.apache.org/11/documentation.html
>>
>> * Protocol:
>> http://kafka.apache.org/11/protocol.html
>>
>> * Successful Jenkins builds for the 1.1 branch:
>> Unit/integration tests: *https://builds.apache.org/job/kafka-1.1-jdk7/162
>> *
>> System tests:
>> https://jenkins.confluent.io/job/system-test-kafka/job/1.1/156/
>>
>> Please test and verify the release artifacts and submit a vote for this
>> RC,
>> or report any issues so we can fix them and get a new RC out ASAP.
>> Although
>> this release vote requires PMC votes to pass, testing, votes, and bug
>> reports are valuable and appreciated from everyone.
>>
>>
>> Regards,
>> Dong
>>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups

Re: Kafka Streams: Share state store across processors

2018-07-18 Thread Matthias J. Sax
You can connect both stores to both processor for this.

-Matthias

On 7/17/18 11:12 PM, Druhin Sagar Goel wrote:
> Hi,
> 
> I am new to the Kafka Streams framework. I have the following streams use 
> case:
> 
> State store A
> State store B
> 
> Processor A
> Processor B
> 
> State store A is only written to by Processor A but also needs to be read by 
> Processor B. State store B needs to be written to by both Processor A and 
> Processor B. Processor A and Processor B run concurrently.
> 
> What is the best way for me to be able to share state stores across 
> processors such that they can be written to by multiple processors?
> 
> 
> Thanks,
> Druhin
> 



signature.asc
Description: OpenPGP digital signature


Re: The asynchronous sending of a message returns no error if the Kafka server is not started

2018-07-18 Thread Hans Jespersen
That is expected behavior. Typically there are multiple kafka brokers and so if 
one is down the client retries to send to a newly elected leader.

A send should not be considered successful until an ACK is received in the 
client from the kafka cluster.

By default the ACK is async for performance but the send() teturns a future so 
you can make it appear to be a synchrounous publish easily. Examples are in the 
javadoc.

-hans 

> On Jul 18, 2018, at 7:45 AM, jingguo yao  wrote:
> 
> The asynchronous sending of a message returns no error even if the
> Kafka server is not started.
> 
> For all the following tests, the local Kafka server is stopped. First,
> consider this piece of code:
> 
> public static void main(String[] args) throws Exception {
>  Properties config = new Properties();
>  config.put("client.id", InetAddress.getLocalHost().getHostName());
>  config.put("bootstrap.servers", "localhost:9092");
>  config.put("acks", "all");
>  config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> 
>  try (Producer producer = new KafkaProducer<>(config);) {
>ProducerRecord record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>producer.send(record, new Callback() {
>  @Override
>  public void onCompletion(RecordMetadata metadata, Exception exception) {
>if (exception != null) {
>  System.out.println("Exceptoin occurred!");
>  exception.printStackTrace(System.out);
>}
>  }
>});
>  }
> }
> 
> Running it will produce the following error:
> 
> Exception occurred!
> org.apache.kafka.common.errors.TimeoutException: Failed to update
> metadata after 6 ms.
> 
> Second, consider this piece of code:
> 
> public static void main(String[] args) throws Exception {
>  Properties config = new Properties();
>  config.put("client.id", InetAddress.getLocalHost().getHostName());
>  config.put("bootstrap.servers", "localhost:9092");
>  config.put("acks", "all");
>  config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> 
>  try (Producer producer = new KafkaProducer<>(config);) {
>ProducerRecord record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>System.out.println("Sending a message...");
>producer.send(record).get();
>System.out.println("Message sent");
>  }
> }
> 
> Running it will produce the following error:
> 
> Sending a message...
> Exception in thread "main" java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Failed to update
> metadata after 6 ms.
> at 
> org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1168)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:859)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:684)
> at com.xdf.foreign.KafkaTest.main(KafkaTest.java:46)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to
> update metadata after 6 ms.
> 
> Third, consider this piece of code:
> 
> public static void main(String[] args) throws Exception {
>  Properties config = new Properties();
>  config.put("client.id", InetAddress.getLocalHost().getHostName());
>  config.put("bootstrap.servers", "localhost:9092");
>  config.put("acks", "all");
>  config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> 
>  try (Producer producer = new KafkaProducer<>(config);) {
>ProducerRecord record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>System.out.println("Sending a message...");
>producer.send(record);
>System.out.println("Message sent");
>  }
> }
> 
> Running it will produce no error. The following output will be
> produced:
> 
> Sending a message...
> Message sent
> 
> I know that the nature of asynchronous sending demands that send
> method ignore connection error to the Kafka server. But I think that
> it is better to document this kind of behaviour somewhere.
> 
> -- 
> Jingguo


Re: If timeout is short, the first poll does not return records

2018-07-18 Thread Thakrar, Jayesh
While this does not answer your question, I believe during the first call, a 
lot of things happen - e.g. get admin and metadata info about the cluster, etc.
That takes "some time" and hence the poll interval that is acceptable/norm for 
regular processing may not be sufficient for initialization AND I believe also 
during periodic metadata update/refresh.

On 7/18/18, 3:41 AM, "jingguo yao"  wrote:

If the timeout is short say 100, the first poll does not return
records for my case. Jay Kreps gave an explanation on [1]. I think
that this behaviour for poll is counterintuitive, it will make Kafka
user's life much easier if this behaviour is documented in [2].

[1] 
http://grokbase.com/p/kafka/users/155mqpwf3n/consumer-poll-returns-no-records-unless-called-more-than-once-why
[2] 
http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long-





Re: [VOTE] 2.0.0 RC2

2018-07-18 Thread Guozhang Wang
+1. Verified the following:

- javadocs
- web docs
- maven staging repository

Besides what Ismael mentioned on upgrade guide, some of the latest doc
fixes in 2.0 seems not be reflected in
http://kafka.apache.org/20/documentation.html yet (this does not need a new
RC, we can just re-copy-and-paste to kafka-site again).


Thanks Rajini!


Guozhang



On Wed, Jul 18, 2018 at 7:48 AM, Ismael Juma  wrote:

> Thanks Rajini! A documentation issue that we must fix before the release
> (but does not require another RC), 1.2 (which became 2.0) is mentioned in
> the upgrade notes:
>
> http://kafka.apache.org/20/documentation.html#upgrade
>
> Ismael
>
> On Sun, Jul 15, 2018 at 9:25 AM Rajini Sivaram 
> wrote:
>
> > Hi Ismael,
> >
> > Thank you for pointing that out. I have re-uploaded the RC2 artifacts to
> > maven including streams-scala_2.12. Also submitted a PR to update build &
> > release scripts to include this.
> >
> > Thank you,
> >
> > Rajini
> >
> >
> >
> > On Fri, Jul 13, 2018 at 7:19 AM, Ismael Juma  wrote:
> >
> > > Hi Rajini,
> > >
> > > Thanks for generating the RC. It seems like the kafka-streams-scala
> 2.12
> > > artifact is missing from the Maven repository:
> > >
> > > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > >
> > > Since this is the first time we are publishing this artifact, it is
> > > possible that this never worked properly.
> > >
> > > Ismael
> > >
> > > On Tue, Jul 10, 2018 at 10:17 AM Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hello Kafka users, developers and client-developers,
> > > >
> > > >
> > > > This is the third candidate for release of Apache Kafka 2.0.0.
> > > >
> > > >
> > > > This is a major version release of Apache Kafka. It includes 40 new
> > KIPs
> > > > and
> > > >
> > > > several critical bug fixes. Please see the 2.0.0 release plan for
> more
> > > > details:
> > > >
> > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > action?pageId=80448820
> > > >
> > > >
> > > > A few notable highlights:
> > > >
> > > >- Prefixed wildcard ACLs (KIP-290), Fine grained ACLs for
> > CreateTopics
> > > >(KIP-277)
> > > >- SASL/OAUTHBEARER implementation (KIP-255)
> > > >- Improved quota communication and customization of quotas
> (KIP-219,
> > > >KIP-257)
> > > >- Efficient memory usage for down conversion (KIP-283)
> > > >- Fix log divergence between leader and follower during fast
> leader
> > > >failover (KIP-279)
> > > >- Drop support for Java 7 and remove deprecated code including old
> > > scala
> > > >clients
> > > >- Connect REST extension plugin, support for externalizing secrets
> > and
> > > >improved error handling (KIP-285, KIP-297, KIP-298 etc.)
> > > >- Scala API for Kafka Streams and other Streams API improvements
> > > >(KIP-270, KIP-150, KIP-245, KIP-251 etc.)
> > > >
> > > >
> > > > Release notes for the 2.0.0 release:
> > > >
> > > > http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/RELEASE_NOTES.html
> > > >
> > > >
> > > > *** Please download, test and vote by Friday, July 13, 4pm PT
> > > >
> > > >
> > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > >
> > > > http://kafka.apache.org/KEYS
> > > >
> > > >
> > > > * Release artifacts to be voted upon (source and binary):
> > > >
> > > > http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/
> > > >
> > > >
> > > > * Maven artifacts to be voted upon:
> > > >
> > > > https://repository.apache.org/content/groups/staging/
> > > >
> > > >
> > > > * Javadoc:
> > > >
> > > > http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/javadoc/
> > > >
> > > >
> > > > * Tag to be voted upon (off 2.0 branch) is the 2.0.0 tag:
> > > >
> > > > https://github.com/apache/kafka/tree/2.0.0-rc2
> > > >
> > > >
> > > >
> > > > * Documentation:
> > > >
> > > > http://kafka.apache.org/20/documentation.html
> > > >
> > > >
> > > > * Protocol:
> > > >
> > > > http://kafka.apache.org/20/protocol.html
> > > >
> > > >
> > > > * Successful Jenkins builds for the 2.0 branch:
> > > >
> > > > Unit/integration tests:
> > https://builds.apache.org/job/kafka-2.0-jdk8/72/
> > > >
> > > > System tests:
> > > > https://jenkins.confluent.io/job/system-test-kafka/job/2.0/27/
> > > >
> > > >
> > > > /**
> > > >
> > > >
> > > > Thanks,
> > > >
> > > >
> > > > Rajini
> > > >
> > >
> >
>



-- 
-- Guozhang


Re: [VOTE] 2.0.0 RC2

2018-07-18 Thread Ismael Juma
Thanks Rajini! A documentation issue that we must fix before the release
(but does not require another RC), 1.2 (which became 2.0) is mentioned in
the upgrade notes:

http://kafka.apache.org/20/documentation.html#upgrade

Ismael

On Sun, Jul 15, 2018 at 9:25 AM Rajini Sivaram 
wrote:

> Hi Ismael,
>
> Thank you for pointing that out. I have re-uploaded the RC2 artifacts to
> maven including streams-scala_2.12. Also submitted a PR to update build &
> release scripts to include this.
>
> Thank you,
>
> Rajini
>
>
>
> On Fri, Jul 13, 2018 at 7:19 AM, Ismael Juma  wrote:
>
> > Hi Rajini,
> >
> > Thanks for generating the RC. It seems like the kafka-streams-scala 2.12
> > artifact is missing from the Maven repository:
> >
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > Since this is the first time we are publishing this artifact, it is
> > possible that this never worked properly.
> >
> > Ismael
> >
> > On Tue, Jul 10, 2018 at 10:17 AM Rajini Sivaram  >
> > wrote:
> >
> > > Hello Kafka users, developers and client-developers,
> > >
> > >
> > > This is the third candidate for release of Apache Kafka 2.0.0.
> > >
> > >
> > > This is a major version release of Apache Kafka. It includes 40 new
> KIPs
> > > and
> > >
> > > several critical bug fixes. Please see the 2.0.0 release plan for more
> > > details:
> > >
> > > https://cwiki.apache.org/confluence/pages/viewpage.
> > action?pageId=80448820
> > >
> > >
> > > A few notable highlights:
> > >
> > >- Prefixed wildcard ACLs (KIP-290), Fine grained ACLs for
> CreateTopics
> > >(KIP-277)
> > >- SASL/OAUTHBEARER implementation (KIP-255)
> > >- Improved quota communication and customization of quotas (KIP-219,
> > >KIP-257)
> > >- Efficient memory usage for down conversion (KIP-283)
> > >- Fix log divergence between leader and follower during fast leader
> > >failover (KIP-279)
> > >- Drop support for Java 7 and remove deprecated code including old
> > scala
> > >clients
> > >- Connect REST extension plugin, support for externalizing secrets
> and
> > >improved error handling (KIP-285, KIP-297, KIP-298 etc.)
> > >- Scala API for Kafka Streams and other Streams API improvements
> > >(KIP-270, KIP-150, KIP-245, KIP-251 etc.)
> > >
> > >
> > > Release notes for the 2.0.0 release:
> > >
> > > http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/RELEASE_NOTES.html
> > >
> > >
> > > *** Please download, test and vote by Friday, July 13, 4pm PT
> > >
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > >
> > > http://kafka.apache.org/KEYS
> > >
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > >
> > > http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/
> > >
> > >
> > > * Maven artifacts to be voted upon:
> > >
> > > https://repository.apache.org/content/groups/staging/
> > >
> > >
> > > * Javadoc:
> > >
> > > http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/javadoc/
> > >
> > >
> > > * Tag to be voted upon (off 2.0 branch) is the 2.0.0 tag:
> > >
> > > https://github.com/apache/kafka/tree/2.0.0-rc2
> > >
> > >
> > >
> > > * Documentation:
> > >
> > > http://kafka.apache.org/20/documentation.html
> > >
> > >
> > > * Protocol:
> > >
> > > http://kafka.apache.org/20/protocol.html
> > >
> > >
> > > * Successful Jenkins builds for the 2.0 branch:
> > >
> > > Unit/integration tests:
> https://builds.apache.org/job/kafka-2.0-jdk8/72/
> > >
> > > System tests:
> > > https://jenkins.confluent.io/job/system-test-kafka/job/2.0/27/
> > >
> > >
> > > /**
> > >
> > >
> > > Thanks,
> > >
> > >
> > > Rajini
> > >
> >
>


Re: [VOTE] 1.1.1 RC3

2018-07-18 Thread Ismael Juma
+1 (binding)

Verified signature of source artifact, ran tests and verified quickstart on
source artifact with Java 8, verified quickstart on binary artifact (Scala
2.12) with Java 8, sanity checked release notes and Maven staging
repository.

Thanks for managing the release Dong!

Ismael

On Sun, Jul 8, 2018 at 3:36 PM Dong Lin  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the fourth candidate for release of Apache Kafka 1.1.1.
>
> Apache Kafka 1.1.1 is a bug-fix release for the 1.1 branch that was first
> released with 1.1.0 about 3 months ago. We have fixed about 25 issues since
> that release. A few of the more significant fixes include:
>
> KAFKA-6925  - Fix memory
> leak in StreamsMetricsThreadImpl
> KAFKA-6937  - In-sync
> replica delayed during fetch if replica throttle is exceeded
> KAFKA-6917  - Process
> txn
> completion asynchronously to avoid deadlock
> KAFKA-6893  - Create
> processors before starting acceptor to avoid ArithmeticException
> KAFKA-6870  -
> Fix ConcurrentModificationException in SampledStat
> KAFKA-6878  - Fix
> NullPointerException when querying global state store
> KAFKA-6879  - Invoke
> session init callbacks outside lock to avoid Controller deadlock
> KAFKA-6857  - Prevent
> follower from truncating to the wrong offset if undefined leader epoch is
> requested
> KAFKA-6854  - Log
> cleaner
> fails with transaction markers that are deleted during clean
> KAFKA-6747  - Check
> whether there is in-flight transaction before aborting transaction
> KAFKA-6748  - Double
> check before scheduling a new task after the punctuate call
> KAFKA-6739  -
> Fix IllegalArgumentException when down-converting from V2 to V0/V1
> KAFKA-6728  -
> Fix NullPointerException when instantiating the HeaderConverter
>
> Kafka 1.1.1 release plan:
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+1.1.1
>
> Release notes for the 1.1.1 release:
> http://home.apache.org/~lindong/kafka-1.1.1-rc3/RELEASE_NOTES.html
>
> *** Please download, test and vote by Thursday, July 12, 12pm PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~lindong/kafka-1.1.1-rc3/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~lindong/kafka-1.1.1-rc3/javadoc/
>
> * Tag to be voted upon (off 1.1 branch) is the 1.1.1-rc3 tag:
> https://github.com/apache/kafka/tree/1.1.1-rc3
>
> * Documentation:
> http://kafka.apache.org/11/documentation.html
>
> * Protocol:
> http://kafka.apache.org/11/protocol.html
>
> * Successful Jenkins builds for the 1.1 branch:
> Unit/integration tests: *https://builds.apache.org/job/kafka-1.1-jdk7/162
> *
> System tests:
> https://jenkins.confluent.io/job/system-test-kafka/job/1.1/156/
>
> Please test and verify the release artifacts and submit a vote for this RC,
> or report any issues so we can fix them and get a new RC out ASAP. Although
> this release vote requires PMC votes to pass, testing, votes, and bug
> reports are valuable and appreciated from everyone.
>
>
> Regards,
> Dong
>


The asynchronous sending of a message returns no error if the Kafka server is not started

2018-07-18 Thread jingguo yao
The asynchronous sending of a message returns no error even if the
Kafka server is not started.

For all the following tests, the local Kafka server is stopped. First,
consider this piece of code:

public static void main(String[] args) throws Exception {
  Properties config = new Properties();
  config.put("client.id", InetAddress.getLocalHost().getHostName());
  config.put("bootstrap.servers", "localhost:9092");
  config.put("acks", "all");
  config.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
  config.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

  try (Producer producer = new KafkaProducer<>(config);) {
ProducerRecord record = new
ProducerRecord<>("test-topic", null, "a-little-message");
producer.send(record, new Callback() {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
  System.out.println("Exceptoin occurred!");
  exception.printStackTrace(System.out);
}
  }
});
  }
}

Running it will produce the following error:

Exception occurred!
org.apache.kafka.common.errors.TimeoutException: Failed to update
metadata after 6 ms.

Second, consider this piece of code:

public static void main(String[] args) throws Exception {
  Properties config = new Properties();
  config.put("client.id", InetAddress.getLocalHost().getHostName());
  config.put("bootstrap.servers", "localhost:9092");
  config.put("acks", "all");
  config.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
  config.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

  try (Producer producer = new KafkaProducer<>(config);) {
ProducerRecord record = new
ProducerRecord<>("test-topic", null, "a-little-message");
System.out.println("Sending a message...");
producer.send(record).get();
System.out.println("Message sent");
  }
}

Running it will produce the following error:

Sending a message...
Exception in thread "main" java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Failed to update
metadata after 6 ms.
at 
org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1168)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:859)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:684)
at com.xdf.foreign.KafkaTest.main(KafkaTest.java:46)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to
update metadata after 6 ms.

Third, consider this piece of code:

public static void main(String[] args) throws Exception {
  Properties config = new Properties();
  config.put("client.id", InetAddress.getLocalHost().getHostName());
  config.put("bootstrap.servers", "localhost:9092");
  config.put("acks", "all");
  config.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
  config.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

  try (Producer producer = new KafkaProducer<>(config);) {
ProducerRecord record = new
ProducerRecord<>("test-topic", null, "a-little-message");
System.out.println("Sending a message...");
producer.send(record);
System.out.println("Message sent");
  }
}

Running it will produce no error. The following output will be
produced:

Sending a message...
Message sent

I know that the nature of asynchronous sending demands that send
method ignore connection error to the Kafka server. But I think that
it is better to document this kind of behaviour somewhere.

-- 
Jingguo


Restrict access on kafka with multiple listener

2018-07-18 Thread Matt L
Hi,

I have an existing Kafka Cluster that is configured as PLAINTEXT. We want
to enable SASL (GSSAPI) as an additional listener.

Is there a way to force specific topics to only accept traffic (pub/con)
from a certain listener?

e.g. if i create a topic and set ACLS, how do i stop a client from using
the PLAINTEXT protocol and publishing and consuming to that topic

Thanks,


What is meaning of buffer in KafkaConsumer's Javadoc?

2018-07-18 Thread jingguo yao
KafkaConsumer's Javadoc [1] says:

> There is no client-side buffering in read_committed mode.

And it also says:

> timeout - The time, in milliseconds, spent waiting in poll if data
> is not available in the buffer. If 0, returns immediately with any
> records that are available currently in the buffer, else returns
> empty. Must not be negative.

[2] mentions receive.buffer.bytes. Does the buffer mentioned in
KafkaConsumer's Javadoc means the same thing as receive.buffer.bytes?

[1] 
http://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
[2] http://kafka.apache.org/documentation/#consumerconfigs
-- 
Jingguo


The relation between min.insync.replicas and offsets.topic.replication.factor

2018-07-18 Thread jingguo yao
I have a 3 node Kafka cluster. And the following properties are set in
server.properties:

offsets.topic.replication.factor=3
default.replication.factor=3
min.insync.replicas=2

In one terminal, I ran the following commands:

bin/kafka-topics --zookeeper localhost:2181 --create
--replication-factor 3 --partitions 1 --topic test-topic
bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic
test-topic --offset earliest --partition 0

In another terminal, I ran this command:

bin/kafka-console-producer --broker-list localhost:9092 --topic test-topic

After a while, the following error will keep popping out in the second
terminal:

[2018-07-18 17:09:06,255] ERROR [Consumer clientId=consumer-1,
groupId=console-consumer-61589] Offset commit failed on partition
test-topic-0 at offset 2: The coordinator is not available.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

After I added the following property to server.properties, the error
disappeared:

offsets.topic.replication.factor=3

I think that it is better to document the relation between such properties.

-- 
Jingguo


If timeout is short, the first poll does not return records

2018-07-18 Thread jingguo yao
If the timeout is short say 100, the first poll does not return
records for my case. Jay Kreps gave an explanation on [1]. I think
that this behaviour for poll is counterintuitive, it will make Kafka
user's life much easier if this behaviour is documented in [2].

[1] 
http://grokbase.com/p/kafka/users/155mqpwf3n/consumer-poll-returns-no-records-unless-called-more-than-once-why
[2] 
http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long-