[VOTE] KIP-223 - Add per-topic min lead and per-partition lead metrics to KafkaConsumer

2017-11-29 Thread Hu Xi
Hi all,

As I didn't see any further discussion around this KIP, I'd like to start
voting.

KIP documentation:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-223+-+Add+per-topic+min+lead+and+per-partition+lead+metrics+to+KafkaConsumer



Cheers,

huxihx


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-29 Thread Dong Lin
Hey Colin,

Thanks much for the update. I have a few questions below:

1. I am not very sure that we need Fetch Session Epoch. It seems that Fetch
Session Epoch is only needed to help leader distinguish between "a full
fetch request" and "a full fetch request and request a new incremental
fetch session". Alternatively, follower can also indicate "a full fetch
request and request a new incremental fetch session" by setting Fetch
Session ID to -1 without using Fetch Session Epoch. Does this make sense?

2. It is said that Incremental FetchRequest will include partitions whose
fetch offset or maximum number of fetch bytes has been changed. If
follower's logStartOffet of a partition has changed, should this partition
also be included in the next FetchRequest to the leader? Otherwise, it may
affect the handling of DeleteRecordsRequest because leader may not know the
corresponding data has been deleted on the follower.

3. In the section "Per-Partition Data", a partition is not considered dirty
if its log start offset has changed. Later in the section "FetchRequest
Changes", it is said that incremental fetch responses will include a
partition if its logStartOffset has changed. It seems inconsistent. Can you
update the KIP to clarify it?

4. In "Fetch Session Caching" section, it is said that each broker has a
limited number of slots. How is this number determined? Does this require a
new broker config for this number? What is the error code if broker does
not have new log for the incoming FetchRequest?

5. Can you clarify what happens if follower adds a partition to the
ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does leader needs
to generate a new session for this ReplicaFetcherThread or does it re-use
the existing session? If it uses a new session, is the old session actively
deleted from the slot?


BTW, I think it may be useful if the KIP can include the example workflow
of how this feature will be used in case of partition change and so on.

Thanks,
Dong


On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe  wrote:

> I updated the KIP with the ideas we've been discussing.
>
> best,
> Colin
>
> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
> > On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
> > > Hi Colin, thank you  for this KIP, it can become a really useful thing.
> > >
> > > I just scanned through the discussion so far and wanted to start a
> > > thread to make as decision about keeping the
> > > cache with the Connection / Session or having some sort of UUID indN
> exed
> > > global Map.
> > >
> > > Sorry if that has been settled already and I missed it. In this case
> > > could anyone point me to the discussion?
> >
> > Hi Jan,
> >
> > I don't think anyone has discussed the idea of tying the cache to an
> > individual TCP session yet.  I agree that since the cache is intended to
> > be used only by a single follower or client, it's an interesting thing
> > to think about.
> >
> > I guess the obvious disadvantage is that whenever your TCP session
> > drops, you have to make a full fetch request rather than an incremental
> > one.  It's not clear to me how often this happens in practice -- it
> > probably depends a lot on the quality of the network.  From a code
> > perspective, it might also be a bit difficult to access data associated
> > with the Session from classes like KafkaApis (although we could refactor
> > it to make this easier).
> >
> > It's also clear that even if we tie the cache to the session, we still
> > have to have limits on the number of caches we're willing to create.
> > And probably we should reserve some cache slots for each follower, so
> > that clients don't take all of them.
> >
> > >
> > > Id rather see a protocol in which the client is hinting the broker
> that,
> > > he is going to use the feature instead of a client
> > > realizing that the broker just offered the feature (regardless of
> > > protocol version which should only indicate that the feature
> > > would be usable).
> >
> > Hmm.  I'm not sure what you mean by "hinting."  I do think that the
> > server should have the option of not accepting incremental requests from
> > specific clients, in order to save memory space.
> >
> > > This seems to work better with a per
> > > connection/session attached Metadata than with a Map and could allow
> for
> > > easier client implementations.
> > > It would also make Client-side code easier as there wouldn't be any
> > > Cache-miss error Messages to handle.
> >
> > It is nice not to have to handle cache-miss responses, I agree.
> > However, TCP sessions aren't exposed to most of our client-side code.
> > For example, when the Producer creates a message and hands it off to the
> > NetworkClient, the NC will transparently re-connect and re-send a
> > message if the first send failed.  The higher-level code will not be
> > informed about whether the TCP session was re-established, whether an
> > existing TCP session was used, and so on.  

Jenkins build is back to normal : kafka-trunk-jdk9 #228

2017-11-29 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk7 #3009

2017-11-29 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-6286) Consider reusing materialized store for multi-same-stream join

2017-11-29 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6286:


 Summary: Consider reusing materialized store for multi-same-stream 
join
 Key: KAFKA-6286
 URL: https://issues.apache.org/jira/browse/KAFKA-6286
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Guozhang Wang


Imagine the following streams application:

{code}
stream1.join(stream2...)..join(stream2)
{code}

Each join will result in {{stream2}} being materialized into a separate store. 
Arguably such multi-joins that involves the same stream multiple times is rare, 
but it worth considering if we can optimize such cases.

One thing to note, though, is that in our DSL parser today we do "put into 
store first, and then query the other store second", which means if we share 
the same store it would result in duplicates as the matching would already see 
the newly put records in the second join.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #3912: KAFKA-5936: KafkaProducer.close should throw Inter...

2017-11-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3912


---


Jenkins build is back to normal : kafka-trunk-jdk8 #2247

2017-11-29 Thread Apache Jenkins Server
See 




[jira] [Reopened] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-11-29 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reopened KAFKA-4669:


> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [VOTE] KIP-203: Add toLowerCase support to sasl.kerberos.principal.to.local rule

2017-11-29 Thread Jason Gustafson
+1 Thanks for the KIP!

On Fri, Nov 3, 2017 at 10:08 AM, Manikumar 
wrote:

> Bump up. waiting for few more binding votes.
>
> On Wed, Oct 18, 2017 at 6:57 PM, Rajini Sivaram 
> wrote:
>
> > +1 (binding)
> >
> > On Mon, Oct 9, 2017 at 5:32 PM, Manikumar 
> > wrote:
> >
> > > I'm bumping this up to get some attention :)
> > >
> > > On Wed, Sep 27, 2017 at 8:46 PM, Tom Bentley 
> > > wrote:
> > >
> > > > +1 (nonbinding)
> > > >
> > > > On 27 September 2017 at 16:10, Manikumar 
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I'd like to start the vote on KIP-203. Details are here:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 203%3A+Add+toLowerCase+support+to+sasl.kerberos.
> > > principal.to.local+rule
> > > > >
> > > > > Thanks,
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-231: Improve the Required ACL of ListGroups API

2017-11-29 Thread Vahid S Hashemian
Completing the subject line :)



From:   "Vahid S Hashemian" 
To: dev 
Date:   11/29/2017 03:17 PM
Subject:[DISCUSS] KIP-231:



Hi everyone,

I started KIP-231 to propose a small change to the required ACL of 
ListGroups API (in response to KAFKA-5638): 
https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D231-253A-2BImprove-2Bthe-2BRequired-2BACL-2Bof-2BListGroups-2BAPI=DwIFAg=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=XjHVTsIl7t-z0NBesB0U-ptMMm6mmpy3UqS8TjJM5yM=eu378oaLvC0Wzbfcz15Rwo4nqdrO11ENLK6v9Kq9Z6w=

Your feedback and suggestions are welcome!

Thanks.
--Vahid








[DISCUSS] KIP-231:

2017-11-29 Thread Vahid S Hashemian
Hi everyone,

I started KIP-231 to propose a small change to the required ACL of 
ListGroups API (in response to KAFKA-5638): 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-231%3A+Improve+the+Required+ACL+of+ListGroups+API
Your feedback and suggestions are welcome!

Thanks.
--Vahid




Re: java.lang.IllegalStateException: Correlation id for response () does not match request ()

2017-11-29 Thread Aarti Gupta
https://issues.apache.org/jira/browse/KAFKA-4669?focusedCommentId=16271727=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16271727
Can we reopen this issue ?

We saw this on the consumer in production today. We are on 0.11.01

ERROR c.v.v.h.m.k.KafkaEventConsumerDelegate- Error fetching next record
java.lang.Exception: Error fetching next new record from kafka queue
at
com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:121)
at
com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEvent(KafkaEventConsumerDelegate.java:64)
at
com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEvent(EventListenerAdapter.java:76)
at
com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEventAndAck(EventListenerAdapter.java:94)
at
com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter$1.run(EventListenerAdapter.java:125)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Correlation id for response
(386681) does not match request (386680), request header:
{api_key=9,api_version=3,correlation_id=386680,client_id=consumer-36}

at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
at
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:477)
at
org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1346)
at
com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:86)
... 5 common frames omitted

On Mon, Mar 6, 2017 at 9:44 AM, Ismael Juma  wrote:

> Hi Mickael,
>
> This looks to be the same as KAFKA-4669. In theory, this should never
> happen and it's unclear when/how it can happen. Not sure if someone has
> investigated it in more detail.
>
> Ismael
>
> On Mon, Mar 6, 2017 at 5:15 PM, Mickael Maison 
> wrote:
>
> > Hi,
> >
> > In one of our clusters, some of our clients occasionally see this
> > exception:
> > java.lang.IllegalStateException: Correlation id for response (4564)
> > does not match request (4562)
> > at org.apache.kafka.clients.NetworkClient.correlate(
> > NetworkClient.java:486)
> > at org.apache.kafka.clients.NetworkClient.parseResponse(
> > NetworkClient.java:381)
> > at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> > NetworkClient.java:449)
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
> > at org.apache.kafka.clients.producer.internals.Sender.run(
> Sender.java:229)
> > at org.apache.kafka.clients.producer.internals.Sender.run(
> Sender.java:134)
> > at java.lang.Thread.run(Unknown Source)
> >
> > We've also seen it from consumer poll() and commit()
> >
> > Usually the response's correlation id is off by just 1 or 2 (like
> > above) but we've also seen it off by a few hundreds:
> > java.lang.IllegalStateException: Correlation id for response (742)
> > does not match request (174)
> > at org.apache.kafka.clients.NetworkClient.correlate(
> > NetworkClient.java:486)
> > at org.apache.kafka.clients.NetworkClient.parseResponse(
> > NetworkClient.java:381)
> > at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> > NetworkClient.java:449)
> > at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:269)
> > at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.
> > clientPoll(ConsumerNetworkClient.java:360)
> > at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> > at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> > at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > commitOffsetsSync(ConsumerCoordinator.java:426)
> > at org.apache.kafka.clients.consumer.KafkaConsumer.
> > commitSync(KafkaConsumer.java:1059)
> > at org.apache.kafka.clients.consumer.KafkaConsumer.
> > commitSync(KafkaConsumer.java:1027)
> >
> > When this happens, all subsequent responses are also shifted:
> > java.lang.IllegalStateException: Correlation id for response (743)
> 

Jenkins build is back to normal : kafka-trunk-jdk7 #3007

2017-11-29 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-29 Thread Colin McCabe
I updated the KIP with the ideas we've been discussing.

best,
Colin

On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
> > Hi Colin, thank you  for this KIP, it can become a really useful thing.
> > 
> > I just scanned through the discussion so far and wanted to start a 
> > thread to make as decision about keeping the
> > cache with the Connection / Session or having some sort of UUID indN exed 
> > global Map.
> > 
> > Sorry if that has been settled already and I missed it. In this case 
> > could anyone point me to the discussion?
> 
> Hi Jan,
> 
> I don't think anyone has discussed the idea of tying the cache to an
> individual TCP session yet.  I agree that since the cache is intended to
> be used only by a single follower or client, it's an interesting thing
> to think about.
> 
> I guess the obvious disadvantage is that whenever your TCP session
> drops, you have to make a full fetch request rather than an incremental
> one.  It's not clear to me how often this happens in practice -- it
> probably depends a lot on the quality of the network.  From a code
> perspective, it might also be a bit difficult to access data associated
> with the Session from classes like KafkaApis (although we could refactor
> it to make this easier).
> 
> It's also clear that even if we tie the cache to the session, we still
> have to have limits on the number of caches we're willing to create. 
> And probably we should reserve some cache slots for each follower, so
> that clients don't take all of them.
> 
> > 
> > Id rather see a protocol in which the client is hinting the broker that, 
> > he is going to use the feature instead of a client
> > realizing that the broker just offered the feature (regardless of 
> > protocol version which should only indicate that the feature
> > would be usable).
> 
> Hmm.  I'm not sure what you mean by "hinting."  I do think that the
> server should have the option of not accepting incremental requests from
> specific clients, in order to save memory space.
> 
> > This seems to work better with a per 
> > connection/session attached Metadata than with a Map and could allow for
> > easier client implementations.
> > It would also make Client-side code easier as there wouldn't be any 
> > Cache-miss error Messages to handle.
> 
> It is nice not to have to handle cache-miss responses, I agree. 
> However, TCP sessions aren't exposed to most of our client-side code. 
> For example, when the Producer creates a message and hands it off to the
> NetworkClient, the NC will transparently re-connect and re-send a
> message if the first send failed.  The higher-level code will not be
> informed about whether the TCP session was re-established, whether an
> existing TCP session was used, and so on.  So overall I would still lean
> towards not coupling this to the TCP session...
> 
> best,
> Colin
> 
> > 
> >   Thank you again for the KIP. And again, if this was clarified already 
> > please drop me a hint where I could read about it.
> > 
> > Best Jan
> > 
> > 
> > 
> > 
> > 
> > On 21.11.2017 22:02, Colin McCabe wrote:
> > > Hi all,
> > >
> > > I created a KIP to improve the scalability and latency of FetchRequest:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> > >
> > > Please take a look.
> > >
> > > cheers,
> > > Colin
> >


Jenkins build is back to normal : kafka-0.10.2-jdk7 #199

2017-11-29 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-6285) OffsetCommitRequest should have read-after-write logic

2017-11-29 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-6285:
---

 Summary: OffsetCommitRequest should have read-after-write logic
 Key: KAFKA-6285
 URL: https://issues.apache.org/jira/browse/KAFKA-6285
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin


Currently OffsetCommitRequest does not have read-after-write logic and a 
consumer can possibly read an older offset value after successfully committing 
the offset. This is because broker may respond to OffsetCommitRequest before 
writing offset to the disk and the memory.

This is probably not a problem for most users who do not immediately read 
offset after committing offset. But it can be a problem if broker fails before 
writing offset to disk. It will be nice if we can have read-after-write logic 
for OffsetCommitRequest.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6284) System Test failed: ConnectRestApiTest

2017-11-29 Thread Mikkin Patel (JIRA)
Mikkin Patel created KAFKA-6284:
---

 Summary: System Test failed: ConnectRestApiTest 
 Key: KAFKA-6284
 URL: https://issues.apache.org/jira/browse/KAFKA-6284
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.0.1
Reporter: Mikkin Patel
 Fix For: 1.0.1


KAFKA-3073 introduced topic regex support for Connect sinks. The 
ConnectRestApiTest failed to verifiy configdef with expected response. 

{noformat}

Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.1-py2.7.egg/ducktape/tests/runner_client.py",
 line 132, in run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.1-py2.7.egg/ducktape/tests/runner_client.py",
 line 185, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/tests/kafkatest/tests/connect/connect_rest_test.py",
 line 92, in test_rest_api
self.verify_config(self.FILE_SINK_CONNECTOR, self.FILE_SINK_CONFIGS, 
configs)
  File 
"/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/tests/kafkatest/tests/connect/connect_rest_test.py",
 line 200, in verify_config
assert config_def == set(config_names)
AssertionError

{noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4277: STRUCT schema equality check failing for complex o...

2017-11-29 Thread ntrp
GitHub user ntrp opened a pull request:

https://github.com/apache/kafka/pull/4277

STRUCT schema equality check failing for complex objects

*The schema equality need to be done with the equals method to avoid 
unwanted side effects. When applied to complex objects with the same schema it 
was failing.*

*To test the faulty behaviour try calling the method on a complex object 
(at least 3 levels of nesting and array fields with a lot of objects).*

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ntrp/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4277.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4277


commit 950f0e426d1927c000f99d0c1c8d40e6b10cabc1
Author: Ivan Greguric Ortolan 
Date:   2017-11-29T19:23:44Z

fixed the convertToJson method, struct equality check failing for complex 
JSON




---


Build failed in Jenkins: kafka-0.10.0-jdk7 #214

2017-11-29 Thread Apache Jenkins Server
See 


Changes:

[me] KAFKA-4827: Correctly encode special chars while creating URI objects

--
[...truncated 81.02 KB...]

kafka.server.DelayedOperationTest > testRequestPurge PASSED

kafka.server.DelayedOperationTest > testRequestExpiry PASSED

kafka.server.DelayedOperationTest > testRequestSatisfaction PASSED

kafka.server.ApiVersionsRequestTest > 
testApiVersionsRequestWithUnsupportedVersion PASSED

kafka.server.ApiVersionsRequestTest > testApiVersionsRequest PASSED

kafka.server.MetadataCacheTest > 
getTopicMetadataWithNonSupportedSecurityProtocol PASSED

kafka.server.MetadataCacheTest > getTopicMetadataIsrNotAvailable PASSED

kafka.server.MetadataCacheTest > getTopicMetadata PASSED

kafka.server.MetadataCacheTest > getTopicMetadataReplicaNotAvailable PASSED

kafka.server.MetadataCacheTest > getTopicMetadataPartitionLeaderNotAvailable 
PASSED

kafka.server.MetadataCacheTest > getAliveBrokersShouldNotBeMutatedByUpdateCache 
PASSED

kafka.server.MetadataCacheTest > getTopicMetadataNonExistingTopics PASSED

kafka.server.SessionExpireListenerTest > testSessionExpireListenerMetrics PASSED

kafka.server.SaslSslReplicaFetchTest > testReplicaFetcherThread PASSED

kafka.server.KafkaConfigTest > testAdvertiseConfigured PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeHoursProvided PASSED

kafka.server.KafkaConfigTest > testLogRollTimeBothMsAndHoursProvided PASSED

kafka.server.KafkaConfigTest > testLogRetentionValid PASSED

kafka.server.KafkaConfigTest > testSpecificProperties PASSED

kafka.server.KafkaConfigTest > testDefaultCompressionType PASSED

kafka.server.KafkaConfigTest > testDuplicateListeners PASSED

kafka.server.KafkaConfigTest > testLogRetentionUnlimited PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeMsProvided PASSED

kafka.server.KafkaConfigTest > testLogRollTimeNoConfigProvided PASSED

kafka.server.KafkaConfigTest > testInvalidInterBrokerSecurityProtocol PASSED

kafka.server.KafkaConfigTest > testAdvertiseDefaults PASSED

kafka.server.KafkaConfigTest > testBadListenerProtocol PASSED

kafka.server.KafkaConfigTest > testListenerDefaults PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndHoursProvided 
PASSED

kafka.server.KafkaConfigTest > testUncleanElectionDisabled PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeNoConfigProvided PASSED

kafka.server.KafkaConfigTest > testCaseInsensitiveListenerProtocol PASSED

kafka.server.KafkaConfigTest > testFromPropsInvalid PASSED

kafka.server.KafkaConfigTest > testInvalidCompressionType PASSED

kafka.server.KafkaConfigTest > testAdvertiseHostNameDefault PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeMinutesProvided PASSED

kafka.server.KafkaConfigTest > testValidCompressionType PASSED

kafka.server.KafkaConfigTest > testUncleanElectionInvalid PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndMsProvided 
PASSED

kafka.server.KafkaConfigTest > testLogRollTimeMsProvided PASSED

kafka.server.KafkaConfigTest > testUncleanLeaderElectionDefault PASSED

kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol PASSED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled PASSED

kafka.server.KafkaConfigTest > testAdvertisePortDefault PASSED

kafka.server.KafkaConfigTest > testVersionConfiguration PASSED

kafka.server.KafkaConfigTest > testEqualAdvertisedListenersProtocol PASSED

kafka.server.LogOffsetTest > testFetchOffsetsBeforeWithChangingSegmentSize 
PASSED

kafka.server.LogOffsetTest > testGetOffsetsBeforeEarliestTime PASSED

kafka.server.LogOffsetTest > testGetOffsetsForUnknownTopic PASSED

kafka.server.LogOffsetTest > testEmptyLogsGetOffsets PASSED

kafka.server.LogOffsetTest > testGetOffsetsBeforeLatestTime PASSED

kafka.server.LogOffsetTest > testGetOffsetsBeforeNow PASSED

kafka.server.PlaintextReplicaFetchTest > testReplicaFetcherThread PASSED

kafka.server.MetadataRequestTest > testReplicaDownResponse PASSED

kafka.server.MetadataRequestTest > testRack PASSED

kafka.server.MetadataRequestTest > testIsInternal PASSED

kafka.server.MetadataRequestTest > testControllerId PASSED

kafka.server.MetadataRequestTest > testAllTopicsRequest PASSED

kafka.server.MetadataRequestTest > testNoTopicsRequest PASSED

kafka.server.AdvertiseBrokerTest > testBrokerAdvertiseToZK PASSED

kafka.server.IsrExpirationTest > testIsrExpirationForSlowFollowers PASSED

kafka.server.IsrExpirationTest > testIsrExpirationForStuckFollowers PASSED

kafka.server.IsrExpirationTest > testIsrExpirationIfNoFetchRequestMade PASSED

kafka.server.ProduceRequestTest > testSimpleProduceRequest PASSED

kafka.server.ProduceRequestTest > testCorruptLz4ProduceRequest PASSED

kafka.server.SaslPlaintextReplicaFetchTest > testReplicaFetcherThread PASSED

kafka.server.ControlledShutdownLeaderSelectorTest > testSelectLeader PASSED

kafka.server.LeaderElectionTest > 

[GitHub] kafka pull request #4273: KAFKA-4827: Porting fix for KAFKA-4827 to v0.10 an...

2017-11-29 Thread wicknicks
Github user wicknicks closed the pull request at:

https://github.com/apache/kafka/pull/4273


---


[GitHub] kafka pull request #4255: KAFKA-6259: Make KafkaStreams.cleanup() clean glob...

2017-11-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4255


---


[GitHub] kafka pull request #4272: KAFKA-4827: Porting fix for KAFKA-4827 to v0.10 an...

2017-11-29 Thread wicknicks
Github user wicknicks closed the pull request at:

https://github.com/apache/kafka/pull/4272


---


[GitHub] kafka pull request #4276: KAFKA-6260: Ensure selection keys are removed from...

2017-11-29 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/4276

KAFKA-6260: Ensure selection keys are removed from all collections on 
socket close

When a socket is closed, we must remove corresponding selection keys from 
internal collections. This fixes an NPE which is caused by attempting to access 
the selection key's attached channel after it had been cleared after 
disconnecting.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-6260

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4276


commit e715e673b7bca14e2a26a998348528d27ac8a9c8
Author: Jason Gustafson 
Date:   2017-11-29T19:10:39Z

KAFKA-6260: Ensure selection keys are removed from all collections on 
socket close




---


Re: [DISCUSS] KIP-229: DeleteGroups API

2017-11-29 Thread Vahid S Hashemian
Hi Dong,

Thanks a lot for your feedback.
I updated the KIP and included those fields and also made a note of the 
corresponding AdminClient API that will be created to support deleting 
consumer groups.
I hope the updates address your suggestions.

Cheers!

--Vahid




From:   Dong Lin 
To: dev@kafka.apache.org
Date:   11/28/2017 11:16 PM
Subject:Re: [DISCUSS] KIP-229: DeleteGroups API



Hey Vahid,

Thanks for the KIP! This is certainly a useful one and users have been
asking about the ability to delete group from the Kafka offset topic in my
past experience.

It seems that the protocol of the new request/response should probably
include more fields fields. For example, it may be useful to include
throttle_time_ms field and a request level error code in
DeleteGroupsResponse. The request level error code can possibly show error
such as NOT_COORDINATOR_FOR_GROUP.

Also, user may want to use this feature programmatically. Do you think we
should add a corresponding API in AminClient to delete groups? If so, can
you specify the new AdminClient API in the KIP?

Thanks,
Dong


On Tue, Nov 28, 2017 at 4:03 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi everyone,
>
> I started KIP-229 and proposed a consumer group deletion API for
> Kafka-based group management to address KAFKA-6275:
> 
https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=97ut7r-oek0jZuRXs-VmWvs_86JChFctd2xFoP4Y2tI=ZT_XH0rdqPD82T3oMqwcsAi19zCXhq9Zkh9bfffSYLk=

> 229%3A+DeleteGroups+API
> Your feedback and suggestions are welcome!
>
> Thanks.
> --Vahid
>
>
>






[jira] [Resolved] (KAFKA-6038) Repartition topics could be much more transient

2017-11-29 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6038.
--
Resolution: Duplicate

> Repartition topics could be much more transient
> ---
>
> Key: KAFKA-6038
> URL: https://issues.apache.org/jira/browse/KAFKA-6038
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: optimization
>
> Unlike changelog topics, the repartition topics could just be short-lived 
> than eating up the storage space on Kafka brokers. Today users have different 
> ways to configure them with short retention such as enforce a retention of 30 
> minutes with small log segment sizes, or use AppendTime for repartition 
> topics. All these would be cumbersome and Streams should just do this 
> automatically.
> One way to do it is use the “purgeData” admin API (KIP-107) such that after 
> the offset of the input topics are committed, if the input topics are 
> actually repartition topics, we would purge the data immediately. One tricky 
> thing to consider though, is upon (re-)starting the application, if the 
> repartition topics are used for restoring the states, we need to re-fill 
> these topics in the right way in order for restoration purposes, and there 
> might be some devils in the implementation details.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4258: [KAFKA-4499] Add all() and fetchAll() API for quer...

2017-11-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4258


---


[jira] [Resolved] (KAFKA-4499) Add "getAllKeys" API for querying windowed KTable stores

2017-11-29 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4499.
--
   Resolution: Fixed
Fix Version/s: 1.1.0

Issue resolved by pull request 4258
[https://github.com/apache/kafka/pull/4258]

> Add "getAllKeys" API for querying windowed KTable stores
> 
>
> Key: KAFKA-4499
> URL: https://issues.apache.org/jira/browse/KAFKA-4499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>  Labels: needs-kip
> Fix For: 1.1.0
>
> Attachments: 4499-All-test-v1.patch, 4499-CachingWindowStore-v1.patch
>
>
> Currently, both {{KTable}} and windowed-{{KTable}} stores can be queried via 
> IQ feature. While {{ReadOnlyKeyValueStore}} (for {{KTable}} stores) provide 
> method {{all()}} to scan the whole store (ie, returns an iterator over all 
> stored key-value pairs), there is no similar API for {{ReadOnlyWindowStore}} 
> (for windowed-{{KTable}} stores).
> This limits the usage of a windowed store, because the user needs to know 
> what keys are stored in order the query it. It would be useful to provide 
> possible APIs like this (only a rough sketch):
>  - {{keys()}} returns all keys available in the store (maybe together with 
> available time ranges)
>  - {{all(long timeFrom, long timeTo)}} that returns all window for a specific 
> time range
>  - {{allLatest()}} that returns the latest window for each key
> Because this feature would require to scan multiple segments (ie, RockDB 
> instances) it would be quite inefficient with current store design. Thus, 
> this feature also required to redesign the underlying window store itself.
> Because this is a major change, a KIP 
> (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)
>  is required. The KIP should cover the actual API design as well as the store 
> refactoring.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4275: Close socketChannel in finally block

2017-11-29 Thread cmccabe
Github user cmccabe closed the pull request at:

https://github.com/apache/kafka/pull/4275


---


[GitHub] kafka pull request #4275: Close socketChannel in finally block

2017-11-29 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/4275

Close socketChannel in finally block



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-6260

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4275.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4275


commit 78181efb5fe6083459c39c11c5255d2cf82028b3
Author: Colin P. Mccabe 
Date:   2017-11-29T17:36:16Z

Close socketChannel in finally block




---


[GitHub] kafka pull request #4274: KAFKA-6283: Configuration of custom SCRAM SaslServ...

2017-11-29 Thread tombentley
GitHub user tombentley opened a pull request:

https://github.com/apache/kafka/pull/4274

KAFKA-6283: Configuration of custom SCRAM SaslServer implementations

Pass the jaasContext to the ScramServerCallbackHandler, so that custom 
implementations of a SCRAM SaslServer have access to the JAAS configuration.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tombentley/kafka KAFKA-6283-sasl-server-jaas

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4274.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4274


commit a028d2cc8bc63e0313a0d7c52b70edbbcce0ab8e
Author: Tom Bentley 
Date:   2017-11-29T15:55:15Z

KAFKA-6283: Configuration of custom SCRAM SaslServer implementations




---


Re: 答复: REPLY: [DISCUSS]KIP-223 - Add per-topic min lead and per-partition lead metrics to KafkaConsumer

2017-11-29 Thread Jun Rao
Hi, Hu,

Yes. Could you start a separate voting thread?

Thanks,

Jun

On Tue, Nov 28, 2017 at 5:02 PM, Hu Xi  wrote:

> Hi Rao Jun,
>
>
> Already updated the patch per your suggestion and it seems there are no
> further feedbacks on this KIP.  Could we vote now?
>
>
> 
> 发件人: Jun Rao 
> 发送时间: 2017年11月23日 6:21
> 收件人: dev@kafka.apache.org
> 主题: Re: REPLY: [DISCUSS]KIP-223 - Add per-topic min lead and per-partition
> lead metrics to KafkaConsumer
>
> Hi, Hu,
>
> There are two types of names. One is the sensor name, which has to be
> unique. It's fine if the sensor name includes the topic/partition as the
> prefix since the sensor name is only a string and is not exposed to jmx.
> The second name is the metric name, which will be used in jmx. Currently,
> the existing lag metric name uses topic/partition as the prefix. KIP-225
> tries to change the metric name to use topic/partition as the tag. We can
> just do the same thing for lead by using tags in the metric name.
>
> Thanks,
>
> Jun
>
> On Mon, Nov 20, 2017 at 10:14 PM, Hu Xi  wrote:
>
> > Hi Jun,
> >
> >
> > Seems the prefix that is used to be the unique Sensor name cannot be
> > removed, so should we keep the prefix?
> >
> >
> > 
> > 发件人: Jun Rao 
> > 发送时间: 2017年11月21日 3:55
> > 收件人: dev@kafka.apache.org
> > 主题: Re: 答复: 答复: 答复: [DISCUSS]KIP-223 - Add per-topic min lead and
> > per-partition lead metrics to KafkaConsumer
> >
> > Hi, Hu,
> >
> > For the new partition level metrics that you are adding, it seems it's
> > better to just add the topic/partition tag instead of using them in the
> > prefix. For the existing lag metrics, we can fix them in KIP-225.
> >
> > Thanks,
> >
> > Jun
> >
> > On Sun, Nov 19, 2017 at 10:31 PM, Hu Xi  wrote:
> >
> > > Jun,
> > >
> > >
> > > Thanks for the comments. Do you think it'd better to add
> topic/partition
> > > tags for those metrics as well as keep the prefix? If those prefixes
> > should
> > > really be removed, does this KIP need to do the same thing for `lag`
> > ones?
> > >
> > > 
> > > 发件人: Jun Rao 
> > > 发送时间: 2017年11月18日 8:55
> > > 收件人: dev@kafka.apache.org
> > > 主题: Re: 答复: 答复: [DISCUSS]KIP-223 - Add per-topic min lead and
> > > per-partition lead metrics to KafkaConsumer
> > >
> > > Hi, Charly,
> > >
> > > Thanks for the input. It makes sense.
> > >
> > > Hi, Hu,
> > >
> > > Perhaps we can keep the per partition records-lead-min and
> > records-lead-avg
> > > as you had before, but just add the topic and the partition as the tags
> > > instead of prefix of the metric name.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > >
> > > On Wed, Nov 15, 2017 at 4:58 AM, charly molter <
> charly.mol...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun, Jiangle,
> > > >
> > > > I'd just like to clarify that KIP-225 seems to be using per partition
> > > > metric the same way as KIP-223 seems to be doing.
> > > >
> > > > I believe avg and max are still necessary because the MetricsReporter
> > > > doesn't work in a "push" manner and the "Value" measurableStat will
> > only
> > > > keep the last recorded entry.
> > > > Therefore a MetricsReporter usually polls to grab a current view with
> > > Value
> > > > this view is incomplete so it becomes not possible to compute the
> > > > Max/Min/Avg.
> > > > Max/Min/Avg uses SampledStats which work with a rolling window of
> > samples
> > > > and therefore periodic polling would work.
> > > >
> > > > This is why I believe it's necessary to keep Avg, Min and Max for
> these
> > > > metrics as otherwise we wouldn't be able to recompute it in an
> external
> > > > monitoring system.
> > > >
> > > > Am I wrong thinking this?
> > > >
> > > > Thanks,
> > > > Charly
> > > >
> > > >
> > > > On Wed, Nov 15, 2017 at 2:02 AM, Jun Rao  wrote:
> > > >
> > > > > Hi, Charly,
> > > > >
> > > > > Thanks for KIP-225. Your proposal looks reasonable.
> > > > >
> > > > > Hi, Jiangjie,
> > > > >
> > > > > Do you think the approach that KIP-225 proposes is better for
> > exposing
> > > > the
> > > > > per partition metric? Also, do we really need the per partition
> > > > > record-lag-avg
> > > > > and record-lag-max? It seems that an external monitoring system can
> > > > always
> > > > > derive that from the per partition record-lag.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, Nov 14, 2017 at 6:57 AM, charly molter <
> > > charly.mol...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Jun, Hu,
> > > > > >
> > > > > > I have KIP-225 open for adding tags to records-lag:
> > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > > > action?pageId=74686649
> > > > > >
> > > > > > I have a patch more or less ready so I could probably get the fix
> > > > checked
> > > > > > in (after the vote) and you could build on top of 

Re: Plans to extend streams?

2017-11-29 Thread Wim Van Leuven
What you are actually asking is if Kafka Streams should be reimplemented as
Apache Storm?
-wim

On Wed, 29 Nov 2017 at 15:10 Adrienne Kole  wrote:

> Hi,
>
> The purpose of this email is to get overall intuition for the future  plans
> of streams library.
>
> The main question is that, will it be a single threaded application in the
> long run and serve microservices use-cases, or are there any plans to
> extend it to multi-node execution framework with less kafka dependency.
>
> Currently, each streams node 'talks' with kafka cluster and they can
> indirectly talk with each other again through kafka. However, especially if
> kafka is not in the same network with streams nodes (actually this can
> happen if they are in the same network as well) this will cause high
> network overhead and inefficiency.
>
> One solution for this (bypassing network overhead) is to deploy streams
> node on kafka cluster to ensure the data locality. However, this is not
> recommended as the library and kafka can affect each other's performance
> and  streams does not necessarily have to know the internal data
> partitioning of kafka.
>
> Another solution would be extending streams library to have a common
> runtime. IMO, preserving the current selling points of streams (like
> dynamic scale in/out) with this kind of extensions can be very good
> improvement.
>
> So my question is that, will streams in the long/short run, will extend its
> use-cases to massive and efficient stream processing (and compete with
> spark) or stay and strengthen its current position?
>
> Cheers,
> Adrienne
>


[jira] [Created] (KAFKA-6283) Configuration of custom SCRAM SaslServer implementations

2017-11-29 Thread Tom Bentley (JIRA)
Tom Bentley created KAFKA-6283:
--

 Summary: Configuration of custom SCRAM SaslServer implementations
 Key: KAFKA-6283
 URL: https://issues.apache.org/jira/browse/KAFKA-6283
 Project: Kafka
  Issue Type: Bug
Reporter: Tom Bentley
Assignee: Tom Bentley
Priority: Minor


It is difficult to supply configuration information to a custom {{SaslServer}} 
implementation when a SCRAM mechanism is used. 

{{SaslServerAuthenticator.createSaslServer()}} creates a {{SaslServer}} for a 
given mechanism. The call to {{Sasl.createSaslServer()}} passes the broker 
config and a callback handler. In the case of a SCRAM mechanism the callback 
handler is a {{ScramServerCallbackHandler}} which doesn't have access to the 
{{jaasContext}}. This makes it hard to configure a such a {{SaslServer}} 
because I can't supply custom keys to the broker config (any unknown ones get 
removed) and I don't have access to the JAAS config.

In the case of a non-SCRAM {{SaslServer}}, I at least have access to the JAAS 
config via the {{SaslServerCallbackHandler}}.

A simple way to solve this would be to pass the {{jaasContext}} to the 
{{ScramServerCallbackHandler}} from where a custom {{SaslServerFactory}} could 
retrieve it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Flaky healthcheck when trying to check Kafka Stream processing app status

2017-11-29 Thread Bill Bejeck
Patrice,

Thanks for reporting this.  I'll have a look at what you've posted on
Github.

Thanks,
Bill

On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol  wrote:

> Hello,
>
> I have implemented a basic application which uses kafka streams stores and
> interactive queries, available there :
> https://github.com/pchalcol/kstreams-healthcheck
>
> The healthcheck implementation is based on kafka streams metadata and the
> stream state, as illustrated below :
> ```
> String healthcheck() {
> Collection stores = streams.allMetadata();
> long storescount = stores.stream()
> .filter(meta -> meta.host().contains("localhost") && meta.port() == 4567)
> .count();
>
> State state = streams.state();
>
> System.out.println(String.format("Application State: (%d, %s)",
> storescount, state.toString()));
>
> // KO if current node is down or if is in 'not running' state
> if (storescount == 0 || !state.isRunning()) return "KO";
> return "OK";
> }
> ```
>
> I have created the topics with 4 partitions :
> `kafka-topics --create --topic events --zookeeper localhost:2181
> --partitions 4 --replication-factor 1`
> `kafka-topics --create --topic library --zookeeper localhost:2181
> --partitions 4 --replication-factor 1`
>
> What I had expected was the healthcheck returning an error whenever the
> broker is shut down, which is not the case.
>
> When I check the application status using the following
> curl -XGET http://localhost:4567/healthcheck
> The server always returns a SUCCESS response, even if the kafka cluster is
> down.
>
> You will find below the different tests cases I've done.
>
> 1/ The Stream state is not changed after shutting down the kafka cluster
> - start kafka
> `cd docker && docker-compose up -d`
>
> - start producer
> `sbt runMain com.example.streams.Producer`
>
> - start streams and http server
> `sbt runMain com.example.streams.Producer`
>
> - healthcheck
> `curl -XGET http://localhost:4567/healthcheck`
> => response = {"status": "SUCCESS"}
> - shutdown kafka : docker-compose stop
>
> - healthcheck
> `curl -XGET http://localhost:4567/healthcheck`
> => response = {"status": "SUCCESS"} while the expected one should be
> {"status": "ERROR"}
>
>
> 2/ Sometimes, I also encounter this behaviour, no data seems to be
> available when querying the stores
> - Start kafka
> - Start Producer
> - Start Streams and http Server
>
> - Request data : curl -XGET http://localhost:4567/titles
>   This http request calls a service which in turn queries the keyvalue
> store
> => received response
> ```
> {
> "data": [
> {
> "key": 1,
> "value": "Fresh Fruit For Rotting Vegetables"
> },
>
> ...
>
> {
> "key": 10,
> "value": "Fear Of A Black Planet"
> }
> ],
> "status": "SUCCESS"
> }
> ```
>
> - Request data : curl -XGET http://localhost:4567/titles/counts
> => received response
> ```
> {
> "data": [
> {
> "key": "fear of a black planet",
> "value": 414
> },
> ...
> {
> "key": "curtain call - the hits",
> "value": 431
> }
> ],
> "status": "SUCCESS"
> }
> ```
>
> - shutdown kafka
>
> - Request data : curl -XGET http://localhost:4567/titles
> => received response, same as before, which seems to be ok as we are
> querying the local store
> ```
> {
> "data": [
> {
> "key": 1,
> "value": "Fresh Fruit For Rotting Vegetables"
> },
>
> ...
>
> {
> "key": 10,
> "value": "Fear Of A Black Planet"
> }
> ],
> "status": "SUCCESS"
> }
> ```
> - Request data : curl -XGET http://localhost:4567/titles/counts
> => received response, still understandable
> ```
> {
> "data": [
> {
> "key": "fear of a black planet",
> "value": 414
> },
> ...
> {
> "key": "curtain call - the hits",
> "value": 431
> }
> ],
> "status": "SUCCESS"
> }
> ```
>
> - restart kafka
>
> - Request data : curl -XGET http://localhost:4567/titles
> => received response
> ```
> {
> "data": [],
> "status": "SUCCESS"
> }
> ```
>
> - Request data : curl -XGET http://localhost:4567/titles/counts
> => same here, received response
> ```
> {
> "data": [],
> "status": "SUCCESS"
> }
> ```
>
> I also see this entry in the Streams application logs
> ```[error]
> (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758-StreamThread-1)
> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
> caught when producing
> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
> caught when producing
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:136)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> RecordCollectorImpl.java:144)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:283)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
> java:264)
> at
> 

Re: Plans to extend streams?

2017-11-29 Thread Adrienne Kole
Not necessarily.

I would avoid the term "reimplemented". Btw, Apache Storm is not also the
best (streaming) system that can utilize the network and it does not
support runtime scale in/out (at least by design).
So, can streams preserve its current selling points (ex:dynamicity) while
introducing a new communication protocol, which will include the current
one (streams-kafka-streams) and the new one (streams-streams)? Are there
plans over this?



On Wed, Nov 29, 2017 at 3:56 PM, Wim Van Leuven <
wim.vanleu...@highestpoint.biz> wrote:

> What you are actually asking is if Kafka Streams should be reimplemented as
> Apache Storm?
> -wim
>
> On Wed, 29 Nov 2017 at 15:10 Adrienne Kole 
> wrote:
>
> > Hi,
> >
> > The purpose of this email is to get overall intuition for the future
> plans
> > of streams library.
> >
> > The main question is that, will it be a single threaded application in
> the
> > long run and serve microservices use-cases, or are there any plans to
> > extend it to multi-node execution framework with less kafka dependency.
> >
> > Currently, each streams node 'talks' with kafka cluster and they can
> > indirectly talk with each other again through kafka. However, especially
> if
> > kafka is not in the same network with streams nodes (actually this can
> > happen if they are in the same network as well) this will cause high
> > network overhead and inefficiency.
> >
> > One solution for this (bypassing network overhead) is to deploy streams
> > node on kafka cluster to ensure the data locality. However, this is not
> > recommended as the library and kafka can affect each other's performance
> > and  streams does not necessarily have to know the internal data
> > partitioning of kafka.
> >
> > Another solution would be extending streams library to have a common
> > runtime. IMO, preserving the current selling points of streams (like
> > dynamic scale in/out) with this kind of extensions can be very good
> > improvement.
> >
> > So my question is that, will streams in the long/short run, will extend
> its
> > use-cases to massive and efficient stream processing (and compete with
> > spark) or stay and strengthen its current position?
> >
> > Cheers,
> > Adrienne
> >
>


[jira] [Created] (KAFKA-6282) exactly_once semantics breaks demo application

2017-11-29 Thread Romans Markuns (JIRA)
Romans Markuns created KAFKA-6282:
-

 Summary: exactly_once semantics breaks demo application
 Key: KAFKA-6282
 URL: https://issues.apache.org/jira/browse/KAFKA-6282
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0, 0.11.0.0
 Environment: Tested on i7+24GB Ubuntu16.04 and i7+8GB Windows 7, with 
cluster 1.0.0 and 0.11.0.0 and streams 1.0.0 and 0.11.0.0
Reporter: Romans Markuns
 Attachments: WordCountDemo.java, server.properties

+What I try to achieve+
Do successful run of Kafka streams app with setting "processing.guarantee" set 
to "exactly_once"

+How+
Use Kafka quickstart example 
(https://kafka.apache.org/10/documentation/streams/quickstart) and modify only 
configuration parameters. 
Things I've changed:
1) Add one line to WordCountDemo: 
{code:java}
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
{code}
2) Modify server.properties to be the same as we use in QA: set broker id to 1, 
allow deleting topics via admin client and set initial rebalance delay to 3 s.

+What I expect+
Modified demo app works exactly as the original as presented in link above.

+What I get+
1) Original app works fine. Output topic after each line is submitted via 
console producer.
2) Modified app does not process topic record after it is submitted via console 
producer. Stream is in state RUNNING, no errors on warning printed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Plans to extend streams?

2017-11-29 Thread Adrienne Kole
Hi,

The purpose of this email is to get overall intuition for the future  plans
of streams library.

The main question is that, will it be a single threaded application in the
long run and serve microservices use-cases, or are there any plans to
extend it to multi-node execution framework with less kafka dependency.

Currently, each streams node 'talks' with kafka cluster and they can
indirectly talk with each other again through kafka. However, especially if
kafka is not in the same network with streams nodes (actually this can
happen if they are in the same network as well) this will cause high
network overhead and inefficiency.

One solution for this (bypassing network overhead) is to deploy streams
node on kafka cluster to ensure the data locality. However, this is not
recommended as the library and kafka can affect each other's performance
and  streams does not necessarily have to know the internal data
partitioning of kafka.

Another solution would be extending streams library to have a common
runtime. IMO, preserving the current selling points of streams (like
dynamic scale in/out) with this kind of extensions can be very good
improvement.

So my question is that, will streams in the long/short run, will extend its
use-cases to massive and efficient stream processing (and compete with
spark) or stay and strengthen its current position?

Cheers,
Adrienne


Re: [DISCUSS]: KIP-230: Name Windowing Joins

2017-11-29 Thread Matt Farmer
Hi Matthias,

I certainly have found the auto-generated names unwieldy while doing
cluster administration.

I will point out that your KIP doesn't outline what would happen if you
picked a name that resulted in a non unique topic name? What would be the
error handling behavior there?

On Wed, Nov 29, 2017 at 9:03 AM Matthias Margush 
wrote:

> Hi everyone,
>
> I created this KIP to allow windowing joins to be named. If named, then the
> associated internal topic names would be derived from that, instead of
> being randomly generated.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP+230%3A+Name+Windowing+Joins
>
> Thanks,
>
> Matthias
>


[DISCUSS]: KIP-230: Name Windowing Joins

2017-11-29 Thread Matthias Margush
Hi everyone,

I created this KIP to allow windowing joins to be named. If named, then the
associated internal topic names would be derived from that, instead of
being randomly generated.

https://cwiki.apache.org/confluence/display/KAFKA/KIP+230%3A+Name+Windowing+Joins

Thanks,

Matthias


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-11-29 Thread Jan Filipiak

Hi,

thank you for the summary and thanks for acknowledging that I do have a 
point here.


I don't like the second Idea at all. Hence I started of this discussion.

I am just disappointed, back then when we had the discussion about how 
to refactor store overload
and IQ handling, I knew the path we are taking is wrong. Having problems 
implementing these kinda
features (wich are really simple)  is just a symptom of messed up IQ 
implementation. I wish really bad
I could have convinced you guys back then. To be honest with IQ we can 
continue here
as we Materialize but would not send oldValue, but with join you're out 
of luck with current setup.


I of course recommend to do not introduce any optimizations here. Id 
recommend to go towards what
I recommended already back then. So i would't say we need to optimize 
anything later we need to build

the topology better in the first place.




On 28.11.2017 21:00, Guozhang Wang wrote:

Jan,

Thanks for your input, I can understand now that the oldValue is also
exposed in user customized `filter` function and hence want record context
we should expose is a problem. And I think it does brings a good point to
consider for KIP-159. The discussions maybe a bit confusing to reader
though, and hence I'd like to summarize the status quo and with a proposal:

In today's Streams DSL, when a KTable is created either from a source
topic, or from an stateful operator, we will materialize the KTable with a
backing state store; on the other hand, KTables created from a non-stateful
operator like filter, will not be backed by a state store by default unless
users indicate so (e.g. using the overloaded function with the queryable
name or store supplier).

For example:

KTable table1 = builder.table("topic");  // a
state store created for table1
KTable table2 = table1.filter(..);
// no state store created for table2
KTable table3 = table1.filter(.., "storeName");  // a state
store created for table3
KTable table4 = table1.groupBy(..).aggregate(..);// a state
store created for table4

Because of that, the filter() operator above on table1 will always be
exposed with oldValue and newValue; Damian's point is that, we may optimize
the first case such that table1 will only be materialized if users asked so
(e.g. using the overloaded function with a store supplier), and in which
case, we do not need to pass newValue / oldValue pairs (I think this is
what Jan suggests as well, i.e. do filtering before materializing, so that
we can have a smaller backed state store as well). But this optimization
does not eliminate the possibilities that we may still need to do filter if
users does specify "yes I do want to the source KTable itself to be
materialized, please". So the concern about how to expose the record
context in such cases still persists.


With that, regarding to KIP-159 itself, here are my thoughts:

1) if we restrict the scope of exposing record context only to source
KTables / KStreams I felt the KIP itself does not bring much value given
its required API change because only the SourceKStream can safely maintain
its records context, and for SourceKTable if it is materialized, then even
non-stateful operators like Join may still have a concern about exposing
the record context.

2) an alternative idea is we provide the semantics on how record context
would be inherited across the operators for KTable / KStream and expose it
in all operators (similarly in PAPI we would expose a much simpler
contract), and make it as a public contract that Streams library will
guarantee moving forward even we optimize our topology builder; it may not
align perfectly with the linear algebraic semantics but practically
applicable for most cases; if users semantics do not fit in the provided
contract, then they may need to get this themselves (embed such information
in the value payload, for example).

If people do not like the second idea, I'd suggest we hold on pursuing the
first direction since to me its beneficial scope is too limited compared to
its cost.


Guozhang



On Fri, Nov 24, 2017 at 1:39 AM, Jan Filipiak 
wrote:


Cleary we show the oldValue to the user. We have to, because we filter
after the store.
https://github.com/axbaretto/kafka/blob/master/streams/src/m
ain/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java#L96

I cannot help you following this. It is really obvious and I am running
out of tools for explaining.

Thanks for understanding my point to put filter before. Not only would it
make the store smaller. It would make this feature reasonably possible and
the framework easier. Interestingly it would also help to move IQ into more
reasonable directions. And it might help understand that we do not need any
intermediate representation of the topology,

KIP-182 I have no clue what everyone has with their "bytestores" so
broken. But putting another store after doesn't help when the 

Flaky healthcheck when trying to check Kafka Stream processing app status

2017-11-29 Thread Patrice Chalcol
Hello,

I have implemented a basic application which uses kafka streams stores and
interactive queries, available there :
https://github.com/pchalcol/kstreams-healthcheck

The healthcheck implementation is based on kafka streams metadata and the
stream state, as illustrated below :
```
String healthcheck() {
Collection stores = streams.allMetadata();
long storescount = stores.stream()
.filter(meta -> meta.host().contains("localhost") && meta.port() == 4567)
.count();

State state = streams.state();

System.out.println(String.format("Application State: (%d, %s)",
storescount, state.toString()));

// KO if current node is down or if is in 'not running' state
if (storescount == 0 || !state.isRunning()) return "KO";
return "OK";
}
```

I have created the topics with 4 partitions :
`kafka-topics --create --topic events --zookeeper localhost:2181
--partitions 4 --replication-factor 1`
`kafka-topics --create --topic library --zookeeper localhost:2181
--partitions 4 --replication-factor 1`

What I had expected was the healthcheck returning an error whenever the
broker is shut down, which is not the case.

When I check the application status using the following
curl -XGET http://localhost:4567/healthcheck
The server always returns a SUCCESS response, even if the kafka cluster is
down.

You will find below the different tests cases I've done.

1/ The Stream state is not changed after shutting down the kafka cluster
- start kafka
`cd docker && docker-compose up -d`

- start producer
`sbt runMain com.example.streams.Producer`

- start streams and http server
`sbt runMain com.example.streams.Producer`

- healthcheck
`curl -XGET http://localhost:4567/healthcheck`
=> response = {"status": "SUCCESS"}
- shutdown kafka : docker-compose stop

- healthcheck
`curl -XGET http://localhost:4567/healthcheck`
=> response = {"status": "SUCCESS"} while the expected one should be
{"status": "ERROR"}


2/ Sometimes, I also encounter this behaviour, no data seems to be
available when querying the stores
- Start kafka
- Start Producer
- Start Streams and http Server

- Request data : curl -XGET http://localhost:4567/titles
  This http request calls a service which in turn queries the keyvalue store
=> received response
```
{
"data": [
{
"key": 1,
"value": "Fresh Fruit For Rotting Vegetables"
},

...

{
"key": 10,
"value": "Fear Of A Black Planet"
}
],
"status": "SUCCESS"
}
```

- Request data : curl -XGET http://localhost:4567/titles/counts
=> received response
```
{
"data": [
{
"key": "fear of a black planet",
"value": 414
},
...
{
"key": "curtain call - the hits",
"value": 431
}
],
"status": "SUCCESS"
}
```

- shutdown kafka

- Request data : curl -XGET http://localhost:4567/titles
=> received response, same as before, which seems to be ok as we are
querying the local store
```
{
"data": [
{
"key": 1,
"value": "Fresh Fruit For Rotting Vegetables"
},

...

{
"key": 10,
"value": "Fear Of A Black Planet"
}
],
"status": "SUCCESS"
}
```
- Request data : curl -XGET http://localhost:4567/titles/counts
=> received response, still understandable
```
{
"data": [
{
"key": "fear of a black planet",
"value": 414
},
...
{
"key": "curtain call - the hits",
"value": 431
}
],
"status": "SUCCESS"
}
```

- restart kafka

- Request data : curl -XGET http://localhost:4567/titles
=> received response
```
{
"data": [],
"status": "SUCCESS"
}
```

- Request data : curl -XGET http://localhost:4567/titles/counts
=> same here, received response
```
{
"data": [],
"status": "SUCCESS"
}
```

I also see this entry in the Streams application logs
```[error]
(kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758-StreamThread-1)
org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
caught when producing
org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
caught when producing
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:144)
at
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:283)
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:264)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at
org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:253)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:815)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$2800(StreamThread.java:73)
at

Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2017-11-29 Thread Rajini Sivaram
Hi Jason,

Thanks for reviewing the KIP.

I hadn't included *inter.broker.protocol.version*, but you have provided a
good reason to do that in order to avoid an additional rolling restart
during upgrade. I had included *log.message.format.version* along with
other default topic configs, but it probably makes sense to do these two
together.


On Wed, Nov 29, 2017 at 12:00 AM, Jason Gustafson 
wrote:

> Hi Rajini,
>
> One quick question I was wondering about is whether this could be used to
> update the inter-broker protocol version or the message format version?
> Potentially then we'd only need one rolling restart to upgrade the cluster.
> I glanced quickly through the uses of this config in the code and it seems
> like it might be possible. Not sure if there are any complications you or
> others can think of.
>
> Thanks,
> Jason
>
> On Tue, Nov 28, 2017 at 2:48 PM, Rajini Sivaram 
> wrote:
>
> > Hi Colin,
> >
> > Thank you for reviewing the KIP.
> >
> > *kaka-configs.sh* will be converted to use *AdminClient* under
> KAFKA-5722.
> > This is targeted for the next release (1.1.0). Under this KIP, we will
> > implement *AdminClient#alterConfigs* for the dynamic configs listed in
> the
> > KIP. This will include validation of the configs and will return
> > appropriate errors if configs are invalid. Integration tests will also be
> > added using AdmnClient. Only the actual conversion of ConfigCommand to
> use
> > AdminClient will be left to be done under KAFKA-5722.
> >
> > Once KAFKA-5722 is implemented,* kafka-confgs.sh* can be used to obtain
> the
> > current configuration, which can be redirected to a text file to create a
> > static *server.properties* file. This should help when downgrading - but
> it
> > does require brokers to be running. We can also document how to obtain
> the
> > properties using *zookeeper-shell.sh* while downgrading if brokers are
> > down.
> >
> > If we rename properties, we should add the new property to ZK based on
> the
> > value of the old property when the upgraded broker starts up. But we
> would
> > probably leave the old property as is. The old property will be returned
> > and used as a synonym only as long as the broker is on a version where it
> > is still valid. But it can remain in ZK and be updated if downgrading -
> it
> > will be up to the user to update the old property if downgrading or
> delete
> > it if not needed. Renaming properties is likely to be confusing in any
> case
> > even without dynamic configs, so hopefully it will be very rare.
> >
> >
> > Rajini
> >
> > On Tue, Nov 28, 2017 at 7:47 PM, Colin McCabe 
> wrote:
> >
> > > Hi Rajini,
> > >
> > > This seems like a nice improvement!
> > >
> > > One thing that is a bit concerning is that, if bin/kafka-configs.sh is
> > > used, there is no  way for the broker to give feedback or error
> > > messages.  The broker can't say "sorry, I can't reconfigure that in
> that
> > > way."  Or even "that configuration property is not reconfigurable in
> > > this version of the software."  It seems like in the examples give,
> > > users will simply set a configuration using bin/kafka-configs.sh, but
> > > then they have to check the broker log files to see if it could
> actually
> > > be applied.  And even if it couldn't be applied, then it still lingers
> > > in ZooKeeper.
> > >
> > > This seems like it would lead to a lot of user confusion, since they
> get
> > > no feedback when reconfiguring something.  For example, there will be a
> > > lot of scenarios where someone finds a reconfiguration command on
> > > Google, runs it, but then it doesn't do anything because the software
> > > version is different.  And there's no error message or feedback about
> > > this.  It just doesn't work.
> > >
> > > To prevent this, I think we should convert bin/kafka-configs.sh to use
> > > AdminClient's AlterConfigsRequest.  This allows us to detect scenarios
> > > where, because of a typo, different software version, or a value of the
> > > wrong type (eg. string vs. int), the given configuration cannot be
> > > applied.  We really should convert kafka-configs.sh to use AdminClient
> > > anyway, for all the usual reasons-- people want to lock down ZooKeeper,
> > > ACLs should be enforced, internal representations should be hidden, we
> > > should support environments where ZK is not exposed, etc. etc.
> > >
> > > Another issue that I see here is, how does this interact with
> downgrade?
> > >  Presumably, if the user downgrades to a version that doesn't support
> > > KIP-226, all the dynamic configurations stored in ZK revert to whatever
> > > value they have in the local config files.  Do we need to have a
> utility
> > > that can reify the actual applied configuration into a local text file,
> > > to make downgrades less painful?
> > >
> > > With regard to upgrades, what happens if we change the name of a
> > > configuration key in the future?  For example, if we 

Re: [VOTE] KIP-225 - Use tags for consumer “records.lag” metrics

2017-11-29 Thread charly molter
Hi,

Just a reminder that this vote is open. Let me know if you think this is
not a valuable change.

Thanks!

On Wed, Nov 22, 2017 at 10:48 AM, charly molter 
wrote:

>
> Hi,
>
> I would like to start the voting thread for KIP-225.
> This KIP proposes to correct some lag metrics emitted by the consumer.
>
> The KIP wiki is here:
> https://cwiki.apache.org/confluence/x/uaBzB
>
> The discussion thread is here:
> http://search-hadoop.com/m/Kafka/uyzND1F33uL19AYx/threaded
>
> Also could someone assign me to this Jira: KAFKA-5890
> 
>
> Thanks,
> --
> Charly Molter
>



-- 
Charly Molter


[jira] [Created] (KAFKA-6281) Kafka JavaAPI Producer failed with NotLeaderForPartitionException

2017-11-29 Thread Anil (JIRA)
Anil created KAFKA-6281:
---

 Summary: Kafka JavaAPI Producer failed with 
NotLeaderForPartitionException
 Key: KAFKA-6281
 URL: https://issues.apache.org/jira/browse/KAFKA-6281
 Project: Kafka
  Issue Type: Bug
Reporter: Anil
 Attachments: server1-controller.log, server2-controller.log

We are running Kafka (vesion kafka_2.11-0.10.1.0) in a 2 node cluster. We have 
2 producers (Java API) acting on different topics. Each topic has single 
partition. The topic where we had this issue, has one consumer running. This 
set up has been running fine for 3 months, and we saw this issue. All the 
suggested cases/solutions for this issue in other forums don't seem to apply 
for my scenario.

Exception at producer;
{code}
-2017-11-25T17:40:33,035 [kafka-producer-network-thread | producer-1] ERROR 
client.producer.BingLogProducerCallback - Encountered exception in sending 
message ; > org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.
{code}


We haven't enabled retries for the messages, because this is transactional data 
and we want to maintain the order.

Producer config:
{code}
bootstrap.servers : server1ip:9092
acks :all
retries : 0
linger.ms :0
buffer.memory :1024
max.request.size :1024000
key.serializer : org.apache.kafka.common.serialization.StringSerializer
value.serializer : org.apache.kafka.common.serialization.StringSerializer
{code}

We are connecting to server1 at both producer and consumer. The controller log 
at server2 indicates there is some shutdown happened at during sametime, but I 
dont understand why this happened.


{color:red}[2017-11-25 17:31:44,776] DEBUG [Controller 2]: topics not in 
preferred replica Map() (kafka.controller.KafkaController) [2017-11-25 
17:31:44,776] TRACE [Controller 2]: leader imbalance ratio for broker 2 is 
0.00 (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] DEBUG 
[Controller 2]: topics not in preferred replica Map() 
(kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE [Controller 
2]: leader imbalance ratio for broker 1 is 0.00 
(kafka.controller.KafkaController) [2017-11-25 17:34:18,314] INFO 
[SessionExpirationListener on 2], ZK expired; shut down all controller 
components and try to re-elect 
(kafka.controller.KafkaController$SessionExpirationListener) [2017-11-25 
17:34:18,317] DEBUG [Controller 2]: Controller resigning, broker id 2 
(kafka.controller.KafkaController) [2017-11-25 17:34:18,317] DEBUG [Controller 
2]: De-registering IsrChangeNotificationListener 
(kafka.controller.KafkaController) [2017-11-25 17:34:18,317] INFO 
[delete-topics-thread-2], Shutting down 
(kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 
17:34:18,317] INFO [delete-topics-thread-2], Stopped 
(kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 
17:34:18,318] INFO [delete-topics-thread-2], Shutdown completed 
(kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 
17:34:18,318] INFO [Partition state machine on Controller 2]: Stopped partition 
state machine (kafka.controller.PartitionStateMachine) [2017-11-25 
17:34:18,318] INFO [Replica state machine on controller 2]: Stopped replica 
state machine (kafka.controller.ReplicaStateMachine) [2017-11-25 17:34:18,318] 
INFO [Controller-2-to-broker-2-send-thread], Shutting down 
(kafka.controller.RequestSendThread) [2017-11-25 17:34:18,318] INFO 
[Controller-2-to-broker-2-send-thread], Stopped 
(kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO 
[Controller-2-to-broker-2-send-thread], Shutdown completed 
(kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO 
[Controller-2-to-broker-1-send-thread], Shutting down 
(kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO 
[Controller-2-to-broker-1-send-thread], Stopped 
(kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO 
[Controller-2-to-broker-1-send-thread], Shutdown completed 
(kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO [Controller 
2]: Broker 2 resigned as the controller (kafka.controller.KafkaController) 
[2017-11-25 17:34:18,353] DEBUG [IsrChangeNotificationListener] Fired!!! 
(kafka.controller.IsrChangeNotificationListener) [2017-11-25 17:34:18,353] 
DEBUG [IsrChangeNotificationListener] Fired!!! 
(kafka.controller.IsrChangeNotificationListener) [2017-11-25 17:34:18,354] INFO 
[BrokerChangeListener on Controller 2]: Broker change listener fired for path 
/brokers/ids with children 1,2 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2017-11-25 
17:34:18,355] DEBUG [DeleteTopicsListener on 2]: Delete topics listener fired 
for topics to be deleted 
(kafka.controller.PartitionStateMachine$DeleteTopicsListener) [2017-11-25 
17:34:18,362] INFO [AddPartitionsListener on 2]: Partition modification 
triggered