[GitHub] kafka pull request #1822: Fix javadocs of Windowed

2016-09-05 Thread miguno
GitHub user miguno opened a pull request:

https://github.com/apache/kafka/pull/1822

Fix javadocs of Windowed

Previous Javadoc was referring to a 0.10.0.x method that was since removed 
from trunk.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/miguno/kafka trunk-windowed-javadocs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1822.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1822


commit 285b834035b926f8a49d517493f7971d826fe2f3
Author: Michael G. Noll 
Date:   2016-09-05T07:45:39Z

Fix javadocs of Windowed




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1822: Fix javadocs of Windowed

2016-09-05 Thread miguno
Github user miguno closed the pull request at:

https://github.com/apache/kafka/pull/1822


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1823: Fixes javadoc of Windows, fixes typo in parameter ...

2016-09-05 Thread miguno
GitHub user miguno opened a pull request:

https://github.com/apache/kafka/pull/1823

Fixes javadoc of Windows, fixes typo in parameter name of KGroupedTable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/miguno/kafka trunk-windowed-javadocs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1823.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1823


commit ecea313abc5008f8daa38538d91518780e44fff2
Author: Michael G. Noll 
Date:   2016-09-05T07:45:39Z

Fix javadocs of Windowed

commit 3f108de711c18b1ef2008d509cf479a7c47cd909
Author: Michael G. Noll 
Date:   2016-09-05T07:51:52Z

Fix typo "substractor" -> "subtractor"




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Question regarding Producer and Duplicates

2016-09-05 Thread Florian Hussonnois
Thanks Guozhang,

So increasing batch.size can lead to more duplicates in case of failure.

Also when you said : "The broker will accept a batch of records as a whole
or reject them ". For example, if a producer request contains two batches,
a first one for Topic A / Partition 0 and a second one for Topic B /
Partition 1.
Does that means the batch 1 will failed if the acknowledgments for batch 2
cannot be satisfied ? Is a producer request is refused as a whole, or only
one batch within a request is refused ?

Thanks,


2016-08-31 20:53 GMT+02:00 Guozhang Wang :

> Hi Florian,
>
> The broker will accept a batch of records as a whole or reject them as a
> whole unless it encounters an IOException while trying to append the
> messages, which will be treated as a fatal error anyways.
>
> Duplicates usually happen when the whole batch is accepted but the ack was
> not delivered in time, and hence it was re-tried.
>
>
> Guozhang
>
>
> On Tue, Aug 30, 2016 at 2:45 AM, Florian Hussonnois  >
> wrote:
>
> > Hi all,
> >
> > I am using kafka_2.11-0.10.0.1, my understanding is that the producer API
> > batches records per partition to send efficient requests. We can
> configure
> > batch.size to increase the throughtput.
> >
> > However, in case of failure all records within the batch failed ? If that
> > is true,  does that mean that increasing batch.size can also increase the
> > number of duplicates in case of retries ?
> >
> > Thanks,
> >
> > Florian.
> >
>
>
>
> --
> -- Guozhang
>



-- 
Florian HUSSONNOIS


Jenkins build is back to normal : kafka-trunk-jdk7 #1516

2016-09-05 Thread Apache Jenkins Server
See 



Re: [DISCUSS] KIP-78: Cluster Id

2016-09-05 Thread Dong Lin
Hey Sumit,

Thanks for your detailed response. Please see my comment inline.

On Sun, Sep 4, 2016 at 10:56 PM, sumit arrawatia 
wrote:

> Hi Dong,
>
> Please find my answers inline.
>
> Hopefully they address your concerns this time !
>
> Sumit
>
> On Sun, Sep 4, 2016 at 4:54 PM, Dong Lin  wrote:
>
> > Hey Ismael,
> >
> > Thanks for the explanation Ismael. Please see my comment inline.
> >
> > On Sun, Sep 4, 2016 at 8:58 AM, Ismael Juma  wrote:
> >
> > > Hi Dong,
> > >
> > > Sorry for the delay, I was offline for 24 hours. Thankfully Sumit was
> > > around to explain the reasoning for the current approach. From reading
> > the
> > > thread, it seems like it may help to reiterate a few important goals
> for
> > > the auditing use case where there is a requirement to associate a
> message
> > > with a particular cluster:
> > >
> > >1. Each cluster id should be unique.
> > >2. The cluster id should be immutable. If the cluster id can be
> > changed
> > >as a matter of course, it's hard to provide any guarantees.
> Assigning
> > a
> > >meaningful name for the id makes it more likely that you may need to
> > > change
> > >it whereas decoupling the immutable id from the
> > > meaningful/human-readable
> > >name eliminates the issue.
> > >
> >
> > I don't think have a human-readable name is equivalent to a meaningful
> > name. It is not true that a human readable name makes it more likely you
> > want to change it. Look, every city has a human readable name and we
> don't
> > worry about changing its name. The conference room in any company has a
> > human readable name instead of a random id. For the same reason you can
> > name a cluster as Yosemite and don't have to change it in the future.
> >
> > By immutable I think you are saying that we should prevent people from
> > changing cluster.id. However, this KIP doesn't really prevent this from
> > happening -- user can delete znode and restart kafka to change
> cluster.id.
> > Therefore the requirement is not satisfied anyway.
> >
> > I am also not sure why you want to prevent people from changing
> cluster.id
> > after reading the motivation section of this KIP. Is there any motivation
> > or use-case for this requirement?
>
> I don't think you comment below is relevant to the Goal 2 which I am
discussing with Ismael. Isemal said in Goal 2 that people are more likely
to change cluster.id if it is meaningful. My reply explain why it is not
true. But what your comment tried to explain is that it is hard to make
unique cluster.id with human readable name.


>
> As I explained before, a stable cluster id is required for monitoring and
> auditing use cases which are the main motivations of this change. If the id
> changes, you will need to either update either the historical data with new
> cluster id or throw the data away.
>

You actually mean "immutable" by saying stable, right? Since it is a hard
requirement, can you also respond to my earlier concern and how does the
approach in this KIP guarantee that the cluster.id is immutable?


>
> BTW, You provide excellent analogies for why the id should not be human
> readable and changeable :).
>
> Cities have human readable names and it is very hard to enforce that city
> name is unique which is why the postal department needs you provide zip
> code if you want your mail to be delivered correctly. This is analogous to
> why we need cluster id to be stable in a auditing/ monitoring use case (you
> need a zip code (cluster id) to uniquely identify an address (cluster) for
> a message).
>

I don't think it is analogous here. Do you think it is hard to have unique
name of the city if people knows the requirement to choose a unique name
when they create the name?


> Taking the analogy further, we in India recently changed names of biggest
> cities to Indian name from their British given names and nobody had to
> change all their historical data or business logic because the zip code
> remained the same.
>

OK. You are saying it is easy to change city name if you have zip code.
Cool. But what is the take away from this story w.r.t. our discussion here?


>
> Your conference room analogy is also very helpful in understanding why it
> is difficult to manage uniqueness across the organization and why
> uniqueness is essential for monitoring and auditing. Let's say your
> companies has offices all across the world and you wanted to audit and
> monitor their usage. Having a room called "Yosemite" in 3 locations will
> make this impossible. Either you will need to coordinate to make sure
> conference names are unique in offices across the world which places a lot
> of burden on the team. To avoid this, you can assign unique ids to
> conference rooms with Human readable names. And this is in fact what most
> big organizations do.
>

Can you explain why it places a lot of burden on the team to select unique
cluster.id? I think the number of clusters in an organization is less than
1000. It is easy to check

Re: [DISCUSS] KIP-78: Cluster Id

2016-09-05 Thread Ismael Juma
Dong,

Sumit responded to a number of points already, so I will try to be brief.
See inline.

Also, it may just be possible that we won't reach agreement. In that case,
a vote may be a way to figure out if people feel that this proposal adds
value in its current form or not.

On Mon, Sep 5, 2016 at 12:54 AM, Dong Lin  wrote:

> I don't think have a human-readable name is equivalent to a meaningful
> name. It is not true that a human readable name makes it more likely you
> want to change it. Look, every city has a human readable name and we don't
> worry about changing its name. The conference room in any company has a
> human readable name instead of a random id. For the same reason you can
> name a cluster as Yosemite and don't have to change it in the future.
>

As Sumit said, many cities have in fact changed their names. Incidentally,
all the conference names at Confluent were recently renamed. So, this
illustrates the point well. Yes, it is possible to give human-readable, but
not meaningful names. I still think that unique and immutable
auto-generated id + changeable human-readable name is a better overall
solution.

By immutable I think you are saying that we should prevent people from
> changing cluster.id. However, this KIP doesn't really prevent this from
> happening -- user can delete znode and restart kafka to change cluster.id.
> Therefore the requirement is not satisfied anyway.
>

Sure, we can't prevent users from deleting state in ZooKeeper or elsewhere
if they have access to it. The idea is that users wouldn't need to with the
auto-generated id.

I am also not sure why you want to prevent people from changing cluster.id
> after reading the motivation section of this KIP. Is there any motivation
> or use-case for this requirement?
>

I thought I explained this a few times. :) Sumit took a stab as well. The
requirement is to reliably associate a message with a cluster. Each time
the cluster id changes, you are basically "creating" a new cluster so it
would look like messages are associated with 2 different clusters instead
of a single one. This is an old database topic, of course: surrogate versus
natural keys.

It is not clear why it will make downstream code would be more complex and
> feature less useful if we provide a default cluster.id here. For users who
> are not interested in this feature, they can use the cluster.id and all
> downstream application will not be affected. For users who need this
> feature, they can configure a unique human readable cluster.id for their
> clusters. In this case the downstream application will have the same
> complexity as with the approach in this KIP. Did I miss something?
>

Can you please clarify what you mean by "default cluster.id"? I don't
follow what you're saying in the comment above.

Right, there is no easy way to detect this automatically with Kafka. But
> this is not a requirement to automatically detect violation of uniqueness
> in the first place. SRE can manually make sure that the unique cluster.id
> is given to each cluster in the broker config.


We would like the feature to be useful across the board. Not all teams have
a super capable team of SREs like LinkedIn. Some may not even have SREs at
all. :)

I am not sure if it is weird. We can seek the view from other SRE and
> developer to understand whether and why it is weird. I can ask our SRE to
> comment as well. It is hard to evaluate whether "weirdness" outweighs the
> benefits from the ability to identify cluster with a human readable
> cluster.id without knowing its impact on the use-case and user experience.
>

It seems that a few things are being conflated here. You can set the
cluster id manually in either proposal. The main differences are:

1. Whether the cluster id is auto-generated if not present (the KIP
proposes auto-generation and, if I understand correctly, you are suggesting
that it should not)
2. How the cluster id can be set manually (you'd have to set the relevant
znode value with the KIP proposal whereas you are suggesting that it should
be possible via a broker config)
3. The recommended workflow (the KIP suggests that you should just rely on
the auto-generated id whereas you are suggesting that setting the value
manually is a good idea).


> Hmm.. you and Sumit provided two completely difference requirement
> regarding immutability and easiness of change. I share similar view with
> Sumit on this issue. Of course we prefer to avoid changing the config. But
> the one-time config change is probably not a big deal as compared to the
> long-term benefit that comes with human readable in the monitoring/auditing
> use-case.
>

As Sumit clarified, both of us are actually saying the same thing. I am
quite confused when you say that you share a similar view with Sumit on
this issue. :)

Ismael


SASL session expiry

2016-09-05 Thread Mickael Maison
Hi,

While Kerberos has a mechanism to refresh its tickets, SASL PLAIN has
no such feature. This means if a client is connected, as far as I can
tell, we have currently no way of disconnecting him, revoking his
credentials won't help.

I think it would be useful to have a way to force clients to refresh
their SASL session periodically and disconnect them if their
credentials have expired.


What do you think ?


Re: SASL session expiry

2016-09-05 Thread Ismael Juma
Hi Mickael,

The Kerberos ticket refresh mechanism is there for new connections, not
existing connections. Currently, the suggested approach is to rely on the
authorizer to deal with expired credentials. Would this work for you?

Ismael

On Mon, Sep 5, 2016 at 11:13 AM, Mickael Maison 
wrote:

> Hi,
>
> While Kerberos has a mechanism to refresh its tickets, SASL PLAIN has
> no such feature. This means if a client is connected, as far as I can
> tell, we have currently no way of disconnecting him, revoking his
> credentials won't help.
>
> I think it would be useful to have a way to force clients to refresh
> their SASL session periodically and disconnect them if their
> credentials have expired.
>
>
> What do you think ?
>


Re: WARN log message flooding broker logs for a pretty typical SSL setup

2016-09-05 Thread Ismael Juma
Hi Jaikiran,

I agree that this is a valid configuration and the log level seems too high
given that. The original motivation is explained in the PR:

https://github.com/apache/kafka/pull/155/files#diff-fce430ae21a0c98d82da6d4aa551f824L603

That is, help people figure out if client authentication was not setup
correctly, but it seems like a better way to do that is to set
`ssl.client.auth=required`. So I'd, personally, be fine with reducing the
log level to info or debug.

Ismael

On Sun, Sep 4, 2016 at 3:01 PM, Jaikiran Pai 
wrote:

> We just started enabling SSL for our Kafka brokers and (Java) clients and
> among some of the issues we are running into, one of them is the flooding
> of the server/broker Kafka logs where we are seeing these messages:
>
> [2016-09-02 08:07:13,773] WARN SSL peer is not authenticated, returning
> ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
> [2016-09-02 08:07:15,710] WARN SSL peer is not authenticated, returning
> ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
> [2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning
> ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
> [2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning
> ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
> [2016-09-02 08:07:15,712] WARN SSL peer is not authenticated, returning
> ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
> 
>
> They just keep going on and on. In our SSL setup, we have the broker
> configured with the keystore and the Java clients have been configured with
> a proper truststore and all works fine except for these messages flooding
> the logs. We don't have any ACLs setup nor have we enabled client auth
> check.
>
> Looking at the code which generates this WARN message
> https://github.com/apache/kafka/blob/trunk/clients/src/main/
> java/org/apache/kafka/common/network/SslTransportLayer.java#L638 and the
> fact that the setup we have (where we just enable server/broker cert
> validation) is, IMO, a valid scenario and not some exceptional/incorrect
> setup issue, I think this log message is something that can be removed from
> the code (or at least logged at a very lower level given the frequency at
> which this gets logged)
>
> Any thoughts on this?
>
> It's a pretty straightforward change and if this change is something that
> sounds right, I can go ahead and submit a PR.
>
> P.S: This is both on 0.9.0.1 and latest 0.10.0.1.
>
> -Jaikiran
>
>


Re: Queryable state client read guarantees

2016-09-05 Thread Damian Guy
Hi Mikael,


> > Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > hello -> 10
>
>
The case where you get KeyNotFound looks like a bug to me. This shouldn't
happen. I can see why it might happen and we will create a JIRA and fix it
right away.

I'm not sure how you end up with (hello -> 10). It could indicate that the
offsets for the topic you are consuming from weren't committed so the data
gets processed again on the restart.

Thanks,
Damian


[jira] [Created] (KAFKA-4123) RocksDBStore can be marked as open before it has been initialized

2016-09-05 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-4123:
-

 Summary: RocksDBStore can be marked as open before it has been 
initialized
 Key: KAFKA-4123
 URL: https://issues.apache.org/jira/browse/KAFKA-4123
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Damian Guy
Assignee: Damian Guy
 Fix For: 0.10.1.0


A RocksDBStore instance is currently marked as open before the store has been 
initialized from its changelog. This can result in reading old/invalid data 
when querying.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4124) Handle disk failures gracefully

2016-09-05 Thread Gokul (JIRA)
Gokul created KAFKA-4124:


 Summary: Handle disk failures gracefully
 Key: KAFKA-4124
 URL: https://issues.apache.org/jira/browse/KAFKA-4124
 Project: Kafka
  Issue Type: Improvement
Reporter: Gokul


Currently when a disk goes down, the broker also goes down with it. Make the 
broker resilient to disk failure.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-05 Thread Eno Thereska
Hi Matthias,

The motivation for KIP-63 was primarily aggregates and reducing the load on 
"both" state stores and downstream. I think there is agreement that for the DSL 
the motivation and design make sense.

For the Processor API: caching is a major component in any system, and it is 
difficult to continue to operate as before, without fully understanding the 
consequences. Hence, I think this is mostly a case of educating users to 
understand the boundaries of the solution. 

Introducing a cache, either for the state store only, or for downstream 
forwarding only, or for both, leads to moving from a model where we process 
each request end-to-end (today) to one where a request is temporarily buffered 
in a cache. In all the cases, this opens up the question of what to do next 
once the request then leaves the cache, and how to express that (future) 
behaviour. E.g., even when the cache is just for downstream forwarding (i.e., 
decoupled from any state store), the processor API user might be surprised that 
context.forward() does not immediately do anything.

I agree that for ultra-flexibility, a processor API user should be able to 
choose whether the dedup cache is put 1) on top of a store only, 2) on forward 
only, 3) on both store and forward, but given the motivation for KIP-63 
(aggregates), I believe a decoupled store-forward dedup cache is a reasonable 
choice that provides good default behaviour, without prodding the user to 
specify the combinations.

We need to educate users that if a cache is used in the Processor API, the 
forwarding will happen in the future. 

-Eno



> On 4 Sep 2016, at 19:11, Matthias J. Sax  wrote:
> 
>> Processor code should always work; independently if caching is enabled
> or not.
> 
> If we want to get this, I guess we need a quite different design (see (1)).
> 
> The point is, that we want to dedup the output, and not state updates.
> 
> It just happens that our starting point was KTable, for which state
> updates and downstream changelog output is the same thing. Thus, we can
> just use the internal KTable state to do the deduplication for the
> downstream changelog.
> 
> However, from a general point of view (Processor API view), if we dedup
> the output, we want dedup/caching for the processor (and not for a state
> store). Of course, we need a state to do the dedup. For KTable, both
> things merge into a single abstraction, and we use only a single state
> instead of two. From a general point of view, we would need two states
> though (one for the actual state, and one for dedup -- think Processor
> API -- not DSL).
> 
> 
> Alternative proposal 1:
> (see also (2) -- which might be better than this one)
> 
> Thus, it might be a cleaner design to decouple user-states and
> dedup-state from each other. If a user enables dedup/caching (for a
> processor) we add an additional state to do the dedup and this
> dedup-state is independent from all user states and context.forward()
> works as always. The dedup state could be hidden from the user and could
> be a pure in-memory state (no need for any recovery -- only flush on
> commit). Internally, a context.forward() would call dedupState.put() and
> trigger actual output if dedup state needs to evict records.
> 
> The disadvantage would be, that we end up with two states for KTable.
> The advantage is, that deduplication can be switched off/on without any
> Processor code change.
> 
> 
> Alternative proposal 2:
> 
> We basically keep the current KIP design, including not to disable
> context.forward() if a cached state is used. Additionally, for cached
> state, we rename put() into putAndForward() which is only available for
> cached states. Thus, in processor code, a state must be explicitly cast
> into a cached state. We also make the user aware, that an update/put to
> a state result in downstream output and that context.forward() would be
> a "direct/non-cached" output.
> 
> The disadvantage of this is, that processor code is not independent from
> caching and thus, caching cannot just be switched on/off (ie, we do not
> follow the initial statement of this mail). The advantage is, we can
> keep a single state for KTable and this design is just small changes to
> the current KIP.
> 
> 
> 
> -Matthias
> 
> 
> On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
>> Sure, you can use a non-cached state. However, if you write code like
>> below for a non-cached state, and learn about caching later on, and
>> think, caching is a cool feature, I want to use it, you would simply
>> want to enable caching (without breaking your code).
>> 
>> Processor code should always work independently if caching is enabled or
>> not.
>> 
>> -Matthias
>> 
>> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>>> Hi Matthias,
>>> 
>>> Thanks for the good questions. 
>>> 
>>> There is still the option of not using cached state. If one uses cached 
>>> state it will dedup for stores and forwarding further. But you can always 
>>> disable caching and do

[jira] [Commented] (KAFKA-3779) Add the LRU cache for KTable.to() operator

2016-09-05 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15464816#comment-15464816
 ] 

Eno Thereska commented on KAFKA-3779:
-

This might not make sense, since a KTable will already contain deduped data.

> Add the LRU cache for KTable.to() operator
> --
>
> Key: KAFKA-3779
> URL: https://issues.apache.org/jira/browse/KAFKA-3779
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.1.0
>
>
> The KTable.to operator currently does not use a cache. We can add a cache to 
> this operator to deduplicate and reduce data traffic as well. This is to be 
> done after KAFKA-3777.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: SASL session expiry

2016-09-05 Thread Rajini Sivaram
Mickael,

I imagine it is fairly easy in MessageHub to deal with expired SASL/PLAIN
credentials since checks can be added to the interceptor in the broker.

Ismael,

Is it really feasible in general to deal with expired credentials in
Authorizers? It sort of expects tight coupling between authenticator and
authorizer, Not sure how an authorizer would deal with certificate expiry
or certificate revocation when using SSL client auth for instance.


On Mon, Sep 5, 2016 at 11:20 AM, Ismael Juma  wrote:

> Hi Mickael,
>
> The Kerberos ticket refresh mechanism is there for new connections, not
> existing connections. Currently, the suggested approach is to rely on the
> authorizer to deal with expired credentials. Would this work for you?
>
> Ismael
>
> On Mon, Sep 5, 2016 at 11:13 AM, Mickael Maison 
> wrote:
>
> > Hi,
> >
> > While Kerberos has a mechanism to refresh its tickets, SASL PLAIN has
> > no such feature. This means if a client is connected, as far as I can
> > tell, we have currently no way of disconnecting him, revoking his
> > credentials won't help.
> >
> > I think it would be useful to have a way to force clients to refresh
> > their SASL session periodically and disconnect them if their
> > credentials have expired.
> >
> >
> > What do you think ?
> >
>



-- 
Regards,

Rajini


Re: SASL session expiry

2016-09-05 Thread Ismael Juma
Hi Rajini,

It's a good question and it depends on a number of details. For example,
for short-lived certificates with long-lived connections, it would seem
that one would have to duplicate some logic performed by the TLS stack on
the Authorizer, which is not ideal. For the case where the Authorizer
relies on a user's database of some sort, it seems to work OK (supposedly,
the user DB would be updated if the user left). It would be good to think
through use cases and figure out how this could be improved.

Ismael

On Mon, Sep 5, 2016 at 1:01 PM, Rajini Sivaram  wrote:

> Mickael,
>
> I imagine it is fairly easy in MessageHub to deal with expired SASL/PLAIN
> credentials since checks can be added to the interceptor in the broker.
>
> Ismael,
>
> Is it really feasible in general to deal with expired credentials in
> Authorizers? It sort of expects tight coupling between authenticator and
> authorizer, Not sure how an authorizer would deal with certificate expiry
> or certificate revocation when using SSL client auth for instance.
>
>
> On Mon, Sep 5, 2016 at 11:20 AM, Ismael Juma  wrote:
>
> > Hi Mickael,
> >
> > The Kerberos ticket refresh mechanism is there for new connections, not
> > existing connections. Currently, the suggested approach is to rely on the
> > authorizer to deal with expired credentials. Would this work for you?
> >
> > Ismael
> >
> > On Mon, Sep 5, 2016 at 11:13 AM, Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > While Kerberos has a mechanism to refresh its tickets, SASL PLAIN has
> > > no such feature. This means if a client is connected, as far as I can
> > > tell, we have currently no way of disconnecting him, revoking his
> > > credentials won't help.
> > >
> > > I think it would be useful to have a way to force clients to refresh
> > > their SASL session periodically and disconnect them if their
> > > credentials have expired.
> > >
> > >
> > > What do you think ?
> > >
> >
>
>
>
> --
> Regards,
>
> Rajini
>


[jira] [Created] (KAFKA-4125) Provide low-level Processor API meta data in DSL layer

2016-09-05 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-4125:
--

 Summary: Provide low-level Processor API meta data in DSL layer
 Key: KAFKA-4125
 URL: https://issues.apache.org/jira/browse/KAFKA-4125
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
Assignee: Guozhang Wang
Priority: Minor


For Processor API, user can get meta data like record offset, timestamp etc via 
the provided {{Context}} object. It might be useful to allow uses to access 
this information in DSL layer, too.

The idea would be, to do it "the Flink way", ie, by providing
RichFunctions; {{mapValue()}} for example.

Is takes a {{ValueMapper}} that only has method

{noformat}
V2 apply(V1 value);
{noformat}

Thus, you cannot get any meta data within apply (it's completely "blind").

We would add two more interfaces: {{RichFunction}} with a method
{{open(Context context)}} and

{noformat}
RichValueMapper extends ValueMapper, RichFunction
{noformat}

This way, the user can chose to implement Rich- or Standard-function and
we do not need to change existing APIs. Both can be handed into
{{KStream.mapValues()}} for example. Internally, we check if a Rich
function is provided, and if yes, hand in the {{Context}} object once, to
make it available to the user who can now access it within {{apply()}} -- or
course, the user must set a member variable in {{open()}} to hold the
reference to the Context object.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Plans to improve SSL performance in Kafka, for 0.10.x?

2016-09-05 Thread Tom Crayford
We've been using SSL quite successfully on a few thousand production
clusters, and have done heavy performance testing, and as far as we can
see, it doesn't impact the cluster's capacity. The only case I could see
that happening is if you have a *lot* of connection churn, but that isn't
very typical for most Kafka usage.

That's for both produce and consume traffic. We do see increase amounts of
garbage collection under high consumer load with SSL enabled, but nothing
too horrible (brokers are well below the SLA of GC to keep their zookeeper
sessions alive).

Thanks

Tom Crayford
Heroku Kafka

On Sun, Sep 4, 2016 at 6:28 PM, Todd Palino  wrote:

> We've been using SSL for produce traffic (mirror makers only right now, but
> that's a very large percentage of traffic for us), and we're in the process
> of turning it on for inter broker traffic as well. Our experience is that
> this does not present a significant amount of overhead to the brokers.
> Specifically with switching over the IBP, we were expecting a lot more of a
> hit, and it really only ended up being something like a 5% increase in
> system load, and no reduction in the cluster capacity, in our test cluster.
> Note that this relies on the fix in KAFKA-4050 and switching the PRNG to
> SHA1PRNG.
>
> Right now, we're specifically avoiding moving consume traffic to SSL, due
> to the zero copy send issue. Now I've been told (but I have not
> investigated) that OpenSSL can solve this. It would probably be a good use
> of time to look into that further.
>
> That said, switching the message format to the newer option (KIP-31 I
> believe?) will result in the brokers not needing to recompress message
> batches that are produced. This should result in a significant reduction in
> CPU usage, which may offset the cost of SSL. We haven't had a chance to
> fully investigate this, however, as changing that config depends on the
> clients being updated to support the new format.
>
> -Todd
>
> On Sunday, September 4, 2016, Jaikiran Pai 
> wrote:
>
> > We are using 0.10.0.1 of Kafka and (Java) client libraries. We recently
> > decided to start using SSL for Kafka communication between broker and
> > clients. Right now, we have a pretty basic setup with just 1 broker with
> > SSL keystore setup and the Java client(s) communicate using the
> > Producer/Consumer APIs against this single broker. There's no client auth
> > (intentionally) right now. We also have plain text enabled for the
> initial
> > testing.
> >
> > What we have noticed is that the consumer/producer performance when SSL
> is
> > enabled is noticeably poor when compared to plain text. I understand that
> > there are expected to be performance impacts when SSL is enabled but the
> > order of magnitude is too high and in fact it shows up in a very
> noticeable
> > fashion in our product. I do have the numbers, but I haven't been able to
> > narrow it down yet (because there's nothing that stands out, except that
> > it's slow). Our application code is exactly the same between non-SSL and
> > SSL usage.
> >
> > Furthermore, I'm aware of this specific JIRA in Kafka
> > https://issues.apache.org/jira/browse/KAFKA-2561 which acknowledges a
> > similar issue. So what I would like to know is, in context of Kafka
> 0.10.x
> > releases and Java 8 support, are there any timelines that the dev team is
> > looking for in terms of improving this performance issue (which I believe
> > requires usage of OpenSSL or other non-JDK implementations of SSLEngine)?
> > We would like to go for GA of our product in the next couple of months
> and
> > in order to do that, we do plan to have Kafka over SSL working with
> > reasonably good performance, but the current performance isn't promising.
> > Expecting this to be fixed in the next couple of months and have it
> > available in 0.10.x is probably too much to expect, but if we know the
> > plans around this, we should be able to come up with a plan of our own
> for
> > our product.
> >
> >
> > -Jaikiran
> >
>
>
> --
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>


[GitHub] kafka pull request #1824: KAFKA-4123: RocksDBStore can be marked as open bef...

2016-09-05 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/1824

KAFKA-4123: RocksDBStore can be marked as open before it has been 
initialized

Mark the store as open after the DB has been restored from the changelog.
Only add the store to the map in ProcessorStateManager post restore.
Make RocksDBWindowStore.Segment override openDB(..) as it needs to mark the 
Segment as open


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4123

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1824.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1824


commit 43134b939e3a16310084dd203863c0ca73c4e5bd
Author: Damian Guy 
Date:   2016-09-05T13:06:39Z

only mark store as open after it has been initialized. Add store to map in 
state manager post initiliazation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4123) RocksDBStore can be marked as open before it has been initialized

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15464970#comment-15464970
 ] 

ASF GitHub Bot commented on KAFKA-4123:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/1824

KAFKA-4123: RocksDBStore can be marked as open before it has been 
initialized

Mark the store as open after the DB has been restored from the changelog.
Only add the store to the map in ProcessorStateManager post restore.
Make RocksDBWindowStore.Segment override openDB(..) as it needs to mark the 
Segment as open


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4123

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1824.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1824


commit 43134b939e3a16310084dd203863c0ca73c4e5bd
Author: Damian Guy 
Date:   2016-09-05T13:06:39Z

only mark store as open after it has been initialized. Add store to map in 
state manager post initiliazation




> RocksDBStore can be marked as open before it has been initialized
> -
>
> Key: KAFKA-4123
> URL: https://issues.apache.org/jira/browse/KAFKA-4123
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> A RocksDBStore instance is currently marked as open before the store has been 
> initialized from its changelog. This can result in reading old/invalid data 
> when querying.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: SASL session expiry

2016-09-05 Thread Mickael Maison
Hi Rajini,

Yes in our case, I can see how we would add the functionality, but I
was wondering if people might be interested to directly have such a
feature in Kafka. At the moment, the authorization logic is only
invoked for new SASL (and apparently Kerberos) connections. I feel
like having the options to periodically recheck credentials for active
connections would be beneficial.

On Mon, Sep 5, 2016 at 1:22 PM, Ismael Juma  wrote:
> Hi Rajini,
>
> It's a good question and it depends on a number of details. For example,
> for short-lived certificates with long-lived connections, it would seem
> that one would have to duplicate some logic performed by the TLS stack on
> the Authorizer, which is not ideal. For the case where the Authorizer
> relies on a user's database of some sort, it seems to work OK (supposedly,
> the user DB would be updated if the user left). It would be good to think
> through use cases and figure out how this could be improved.
>
> Ismael
>
> On Mon, Sep 5, 2016 at 1:01 PM, Rajini Sivaram > wrote:
>
>> Mickael,
>>
>> I imagine it is fairly easy in MessageHub to deal with expired SASL/PLAIN
>> credentials since checks can be added to the interceptor in the broker.
>>
>> Ismael,
>>
>> Is it really feasible in general to deal with expired credentials in
>> Authorizers? It sort of expects tight coupling between authenticator and
>> authorizer, Not sure how an authorizer would deal with certificate expiry
>> or certificate revocation when using SSL client auth for instance.
>>
>>
>> On Mon, Sep 5, 2016 at 11:20 AM, Ismael Juma  wrote:
>>
>> > Hi Mickael,
>> >
>> > The Kerberos ticket refresh mechanism is there for new connections, not
>> > existing connections. Currently, the suggested approach is to rely on the
>> > authorizer to deal with expired credentials. Would this work for you?
>> >
>> > Ismael
>> >
>> > On Mon, Sep 5, 2016 at 11:13 AM, Mickael Maison <
>> mickael.mai...@gmail.com>
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > While Kerberos has a mechanism to refresh its tickets, SASL PLAIN has
>> > > no such feature. This means if a client is connected, as far as I can
>> > > tell, we have currently no way of disconnecting him, revoking his
>> > > credentials won't help.
>> > >
>> > > I think it would be useful to have a way to force clients to refresh
>> > > their SASL session periodically and disconnect them if their
>> > > credentials have expired.
>> > >
>> > >
>> > > What do you think ?
>> > >
>> >
>>
>>
>>
>> --
>> Regards,
>>
>> Rajini
>>


WARN log message flooding broker logs for a pretty typical SSL setup

2016-09-05 Thread Jaikiran Pai
We just started enabling SSL for our Kafka brokers and (Java) clients 
and among some of the issues we are running into, one of them is the 
flooding of the server/broker Kafka logs where we are seeing these messages:


[2016-09-02 08:07:13,773] WARN SSL peer is not authenticated, returning 
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,710] WARN SSL peer is not authenticated, returning 
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning 
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning 
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,712] WARN SSL peer is not authenticated, returning 
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)



They just keep going on and on. In our SSL setup, we have the broker 
configured with the keystore and the Java clients have been configured 
with a proper truststore and all works fine except for these messages 
flooding the logs. We don't have any ACLs setup nor have we enabled 
client auth check.


Looking at the code which generates this WARN message 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java#L638 
and the fact that the setup we have (where we just enable server/broker 
cert validation) is, IMO, a valid scenario and not some 
exceptional/incorrect setup issue, I think this log message is something 
that can be removed from the code (or at least logged at a very lower 
level given the frequency at which this gets logged)


Any thoughts on this?

It's a pretty straightforward change and if this change is something 
that sounds right, I can go ahead and submit a PR.


P.S: This is both on 0.9.0.1 and latest 0.10.0.1.

-Jaikiran


[jira] [Created] (KAFKA-4126) No relevant log when the topic is non-existent

2016-09-05 Thread JIRA
Balázs Barnabás created KAFKA-4126:
--

 Summary: No relevant log when the topic is non-existent
 Key: KAFKA-4126
 URL: https://issues.apache.org/jira/browse/KAFKA-4126
 Project: Kafka
  Issue Type: Bug
Reporter: Balázs Barnabás
Priority: Minor


When a producer sends a ProducerRecord into a Kafka topic that doesn't existst, 
there is no relevant debug/error log that points out the error.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4123) Queryable State returning null for key before all stores in instance have been intialized

2016-09-05 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4123:
--
Summary: Queryable State returning null for key before all stores in 
instance have been intialized  (was: RocksDBStore can be marked as open before 
it has been initialized)

> Queryable State returning null for key before all stores in instance have 
> been intialized
> -
>
> Key: KAFKA-4123
> URL: https://issues.apache.org/jira/browse/KAFKA-4123
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> A RocksDBStore instance is currently marked as open before the store has been 
> initialized from its changelog. This can result in reading old/invalid data 
> when querying.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4123) Queryable State returning null for key before all stores in instance have been intialized

2016-09-05 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4123:
--
Description: 
A couple of problems:
1. A RocksDBStore instance is currently marked as open before the store has 
been initialized from its changelog. This can result in reading old/invalid 
data when querying.
2. In the case of multiple partitions and the tasks are being initialized it is 
always possible that 1 or more StateStores will be intialized before the 
complete set of stores in the Streams Instance are initialized. Currently when 
this happens a query can return null because it will look in the existing 
initialized stores. However, the key may exist in one of the non-initialized 
instances. We need to wait for all Stores in the instance to be initialized 
before allowing queries to progress.

  was:A RocksDBStore instance is currently marked as open before the store has 
been initialized from its changelog. This can result in reading old/invalid 
data when querying.


> Queryable State returning null for key before all stores in instance have 
> been intialized
> -
>
> Key: KAFKA-4123
> URL: https://issues.apache.org/jira/browse/KAFKA-4123
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> A couple of problems:
> 1. A RocksDBStore instance is currently marked as open before the store has 
> been initialized from its changelog. This can result in reading old/invalid 
> data when querying.
> 2. In the case of multiple partitions and the tasks are being initialized it 
> is always possible that 1 or more StateStores will be intialized before the 
> complete set of stores in the Streams Instance are initialized. Currently 
> when this happens a query can return null because it will look in the 
> existing initialized stores. However, the key may exist in one of the 
> non-initialized instances. We need to wait for all Stores in the instance to 
> be initialized before allowing queries to progress.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4123) Queryable State returning null for key before all stores in instance have been initialized

2016-09-05 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4123:
--
Summary: Queryable State returning null for key before all stores in 
instance have been initialized  (was: Queryable State returning null for key 
before all stores in instance have been intialized)

> Queryable State returning null for key before all stores in instance have 
> been initialized
> --
>
> Key: KAFKA-4123
> URL: https://issues.apache.org/jira/browse/KAFKA-4123
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> A couple of problems:
> 1. A RocksDBStore instance is currently marked as open before the store has 
> been initialized from its changelog. This can result in reading old/invalid 
> data when querying.
> 2. In the case of multiple partitions and the tasks are being initialized it 
> is always possible that 1 or more StateStores will be intialized before the 
> complete set of stores in the Streams Instance are initialized. Currently 
> when this happens a query can return null because it will look in the 
> existing initialized stores. However, the key may exist in one of the 
> non-initialized instances. We need to wait for all Stores in the instance to 
> be initialized before allowing queries to progress.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: SASL session expiry

2016-09-05 Thread Ismael Juma
On Mon, Sep 5, 2016 at 3:48 PM, Mickael Maison 
wrote:

> Yes in our case, I can see how we would add the functionality, but I
> was wondering if people might be interested to directly have such a
> feature in Kafka. At the moment, the authorization logic is only
> invoked for new SASL (and apparently Kerberos) connections.


I think you mean authentication logic above.

Ismael


Re: SASL session expiry

2016-09-05 Thread Mickael Maison
Indeed

On Mon, Sep 5, 2016 at 5:43 PM, Ismael Juma  wrote:
> On Mon, Sep 5, 2016 at 3:48 PM, Mickael Maison 
> wrote:
>
>> Yes in our case, I can see how we would add the functionality, but I
>> was wondering if people might be interested to directly have such a
>> feature in Kafka. At the moment, the authorization logic is only
>> invoked for new SASL (and apparently Kerberos) connections.
>
>
> I think you mean authentication logic above.
>
> Ismael


[jira] [Comment Edited] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected

2016-09-05 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15464171#comment-15464171
 ] 

Greg Fodor edited comment on KAFKA-4120 at 9/5/16 5:34 PM:
---

We were able to work around it by just creating a proper Avro class for the 
byte data, but I think it would probably be helpful to future users if there 
were a way to prevent this from happening -- an exception doesn't seem 
unreasonable.


was (Author: gfodor):
We were able to work around it by just creating a proper Avro class for the 
byte data, but I think it would probably be helpful to future if there were a 
way to prevent this from happening -- an exception doesn't seem unreasonable.

> byte[] keys in RocksDB state stores do not work as expected
> ---
>
> Key: KAFKA-4120
> URL: https://issues.apache.org/jira/browse/KAFKA-4120
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> We ran into an issue using a byte[] key in a RocksDB state store (with the 
> byte array serde.) Internally, the RocksDB store keeps a LRUCache that is 
> backed by a LinkedHashMap that sits between the callers and the actual db. 
> The problem is that while the underlying rocks db will persist byte arrays 
> with equal data as equivalent keys, the LinkedHashMap uses byte[] reference 
> equality from Object.equals/hashcode. So, this can result in multiple entries 
> in the cache for two different byte arrays that have the same contents and 
> are backed by the same key in the db, resulting in unexpected behavior. 
> One such behavior that manifests from this is if you store a value in the 
> state store with a specific key, if you re-read that key with the same byte 
> array you will get the new value, but if you re-read that key with a 
> different byte array with the same bytes, you will get a stale value until 
> the db is flushed. (This made it particularly tricky to track down what was 
> happening :))
> The workaround for us is to convert the keys from raw byte arrays to a 
> deserialized avro structure that provides proper hashcode/equals semantics 
> for the intermediate cache. In general this seems like good practice, so one 
> of the proposed solutions is to simply emit a warning or exception if a key 
> type with breaking semantics like this is provided.
> A few proposed solutions:
> - When the state store is defined on array keys, ensure that the cache map 
> does proper comparisons on array values not array references. This would fix 
> this problem, but seems a bit strange to special case. However, I have a hard 
> time of thinking of other examples where this behavior would burn users.
> - Change the LRU cache to deserialize and serialize all keys to bytes and use 
> a value based comparison for the map. This would be the most correct, as it 
> would ensure that both the rocks db and the cache have identical key spaces 
> and equality/hashing semantics. However, this is probably slow, and since the 
> general case of using avro record types as keys works fine, it will largely 
> be unnecessary overhead.
> - Don't change anything about the behavior, but trigger a warning in the log 
> or fail to start if a state store is defined on array keys (or possibly any 
> key type that fails to properly override Object.equals/hashcode.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-78: Cluster Id

2016-09-05 Thread Dong Lin
Ismael,

I think you are saying that we can stop our discussion and follow simply
take a vote where the majority decides. I don't think this is a good way to
find the best design for a KIP and the discussion seems to be useless. It
doesn't seem like anyone else is interested to join this discussion other
than you, Sumit and I. I will leave it as it is after summarizing our
discussion.

There are 27 emails in this email thread and I think most people wouldn't
bother to read all of them. So, instead of replying to your comments
inline, I will summarize our discussion here. All of the information below,
including my complete description of the alternative, can be found in the
previous emails. I will omit the discussion of other minor stuff --
interested reader can read our prior emails.

This KIP suggests to use randomly generated cluster.id. I am suggesting
that we provide the option for user to set cluster.id in broker config. If
user doesn't explicitly provide value for cluster.id in config or if the
config value is an empty string, then broker can use randomly generated
cluster.id. Otherwise, the broker use the cluster.id from the config.

My argument for the 2nd approach is that it additionally allows user to use
human readable cluster.id if they want to do it. It keeps the benefits of
the existing approach. For companies with only a few Kafka clusters, they
can choose to use readable cluster.id. For companies with too many
clusters, they can choose not to config cluster.id so that random cluster.id
will be generated.

There are two major concern with this KIP:

- The KIP says that it is a requirement for cluster.id to be immutable.
Ismael suggests that the reading cluster.id from config doesn't meet this
requirement. However, the current approach described in the KIP doesn't
satisfy this requirement either. If user deletes the znode that stores
cluster.id, either intentionally or by mistake, the cluster.id is lost
forever. The KIP should be updated to reflect this and this requirement can
not be used in the comparison between the different approaches.

- One of the argument against reading cluster.id from config is that "unique
and immutable auto-generated id + changeable human-readable name is a
better overall solution". Sumit describes the long term plan to use
readable tags as well. However, the KIP doesn't describe the design of
using this readable name/tags. The KIP needs to provide more information
about this plan if this is used to argue for the existing approach against
an alternative.

Dong



On Mon, Sep 5, 2016 at 2:44 AM, Ismael Juma  wrote:

> Dong,
>
> Sumit responded to a number of points already, so I will try to be brief.
> See inline.
>
> Also, it may just be possible that we won't reach agreement. In that case,
> a vote may be a way to figure out if people feel that this proposal adds
> value in its current form or not.
>
> On Mon, Sep 5, 2016 at 12:54 AM, Dong Lin  wrote:
>
> > I don't think have a human-readable name is equivalent to a meaningful
> > name. It is not true that a human readable name makes it more likely you
> > want to change it. Look, every city has a human readable name and we
> don't
> > worry about changing its name. The conference room in any company has a
> > human readable name instead of a random id. For the same reason you can
> > name a cluster as Yosemite and don't have to change it in the future.
> >
>
> As Sumit said, many cities have in fact changed their names. Incidentally,
> all the conference names at Confluent were recently renamed. So, this
> illustrates the point well. Yes, it is possible to give human-readable, but
> not meaningful names. I still think that unique and immutable
> auto-generated id + changeable human-readable name is a better overall
> solution.
>
> By immutable I think you are saying that we should prevent people from
> > changing cluster.id. However, this KIP doesn't really prevent this from
> > happening -- user can delete znode and restart kafka to change
> cluster.id.
> > Therefore the requirement is not satisfied anyway.
> >
>
> Sure, we can't prevent users from deleting state in ZooKeeper or elsewhere
> if they have access to it. The idea is that users wouldn't need to with the
> auto-generated id.
>
> I am also not sure why you want to prevent people from changing cluster.id
> > after reading the motivation section of this KIP. Is there any motivation
> > or use-case for this requirement?
> >
>
> I thought I explained this a few times. :) Sumit took a stab as well. The
> requirement is to reliably associate a message with a cluster. Each time
> the cluster id changes, you are basically "creating" a new cluster so it
> would look like messages are associated with 2 different clusters instead
> of a single one. This is an old database topic, of course: surrogate versus
> natural keys.
>
> It is not clear why it will make downstream code would be more complex and
> > feature less useful if we provide a default cluster.id here

[jira] [Commented] (KAFKA-3408) consumer rebalance fail

2016-09-05 Thread Matt Navarifar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15465537#comment-15465537
 ] 

Matt Navarifar commented on KAFKA-3408:
---

Hi [~ewencp], hate to annoy you further, but could I also be included as a 
contributor to the project? Looking to contribute back to the project!

> consumer rebalance fail
> ---
>
> Key: KAFKA-3408
> URL: https://issues.apache.org/jira/browse/KAFKA-3408
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
> Environment: centos linux
>Reporter: zhongkai liu
>  Labels: newbie
>
> I use "/bin/kafka-console-consumer" command to start two consumers of group 
> "page_group",then the first conumer console report rebalance failure like 
> this:
> ERROR [page_view_group1_slave2-1458095694092-80c33086], error during 
> syncedRebalance (kafka.consumer.ZookeeperConsumerConnector)
> kafka.common.ConsumerRebalanceFailedException: 
> page_view_group1_slave2-1458095694092-80c33086 can't rebalance after 10 
> retries
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:660)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:579)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4127) Possible data loss

2016-09-05 Thread Ramnatthan Alagappan (JIRA)
Ramnatthan Alagappan created KAFKA-4127:
---

 Summary: Possible data loss
 Key: KAFKA-4127
 URL: https://issues.apache.org/jira/browse/KAFKA-4127
 Project: Kafka
  Issue Type: Bug
 Environment: Normal three node Kafka cluster. All machines running 
linux.
Reporter: Ramnatthan Alagappan


I am running a three node Kakfa cluster. ZooKeeper runs in a standalone mode.

When I create a new message topic, I see the following sequence of system calls:

mkdir("/appdir/my-topic1-0")
creat("/appdir/my-topic1-0/.log")

I have configured Kafka to write the messages persistently to the disk before 
acknowledging the client. Specifically, I have set flush.interval_messages to 
1, min_insync_replicas to 3, and disabled dirty election.  Now, I insert a new 
message into the created topic.

I see that Kafka writes the message to the log file and flushes the data down 
to disk by carefully fsync'ing the log file. I get an acknowledgment back from 
the cluster after the message is safely persisted on all three replicas and 
written to disk. 

Unfortunately, Kafka can still lose data since it does not explicitly fsync the 
directory to persist the directory entries of the topic directory and the log 
file. If a crash happens after acknowledging the client, it is possible for 
Kafka lose the directory entry for the topic directory or the log file. Many 
systems carefully issue fsync to the parent directory when a new file or 
directory is created. This is required for the file to be completely persisted 
to the disk.   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Queryable state client read guarantees

2016-09-05 Thread Mikael Högqvist
Hi Damian,

> > Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > hello -> 10
> >
> >
> The case where you get KeyNotFound looks like a bug to me. This shouldn't
> happen. I can see why it might happen and we will create a JIRA and fix it
> right away.
>

Great, thanks for looking into this. I'll try again once the PR is merged.


>
> I'm not sure how you end up with (hello -> 10). It could indicate that the
> offsets for the topic you are consuming from weren't committed so the data
> gets processed again on the restart.
>

Yes, it didn't commit the offsets since streams.close() was not called on
ctrl-c. Fixed by adding a shutdown hook.

Thanks,
Mikael


> Thanks,
> Damian
>


Re: PartitionAssignor / Sort members per subscription time before assigning partitions

2016-09-05 Thread Florian Hussonnois
Hi Jason,

The downside to use client-ids is there is no certitude that they will be
increasing.
So in case there is already as many consumers as partitions, additional
consumers can change the partitions assignment.
This lead to rebalances which are unnecessary as partitions are already
well-balanced.

Thanks,

2016-09-01 20:29 GMT+02:00 Jason Gustafson :

> Hi Florian,
>
> I'm not totally sure I understand the problem. The consumer id consists of
> the clientId configured by the user with a UUID appended to it. If the
> clientId has not been passed in configuration, we use "consumer-{n}" for it
> where n is incremented for every new consumer instance. Is the problem
> basically that this default when applied independently on different jvms is
> giving you less than ideal ordering?
>
> For what it's worth, I know that Kafka Streams is a little more clever in
> how partitions are assigned. It uses a custom assignor which takes into
> account the consumer's host information.
>
> Thanks,
> Jason
>
> On Thu, Sep 1, 2016 at 9:00 AM, Florian Hussonnois 
> wrote:
>
> > Hi Kafka Team,
> >
> > I would like to have your opinion before creating a new JIRA.
> >
> > I'm working with the Java Consumer API. The current partition assignors
> use
> > the consumer ids to sort members before assigning partitions.
> >
> > This works pretty well as long as all consumers are started into the same
> > JVM and no more consumers than the number of partitions are created.
> >
> > However, in many cases consumers are distributed across multiple hosts.
> To
> > continue consuming with an optimal number of consumers (even with a host
> > failure) we can create as many consumers as partitions on each host.
> >
> > For example, we have two consumers C0, C1 each on a dedicated host and
> one
> > topic with 4 partitions. With current assignors it is not possible to
> have
> > 2 consuming threads and 2 idle threads per host.
> >
> > Instead of that, C0 will have 4 consuming threads and C1 will have 4 idle
> > threads.
> >
> > One solution could be to keep a timestamp the first time a member
> > subscribes to a topic. This timestamp can then be used to sort members
> for
> > the partitions assignment. In this way, the partition assignment will be
> > more predictable as it will not depend on member ids.
> >
> > One drawback of this solution is that the consumer responsible of
> > assignments will keep a local state.
> >
> > Thanks,
> >
> > --
> > Florian
> >
>



-- 
Florian HUSSONNOIS


Re: [DISCUSS] KIP-78: Cluster Id

2016-09-05 Thread Ismael Juma
Hi Dong,

A few clarifications below.

On Mon, Sep 5, 2016 at 7:21 PM, Dong Lin  wrote:
>
> I think you are saying that we can stop our discussion and follow simply
> take a vote where the majority decides. I don't think this is a good way to
> find the best design for a KIP and the discussion seems to be useless.


I didn't mean that we should stop the discussion. I meant that if we can't
reach agreement, it would make sense to try and get feedback from a wider
group in the interest of making progress. As you said below, a large number
of messages were exchanged by just 3 people. Even if we were in agreement,
we would not be able to pass the KIP by ourselves. Also, I would certainly
not classify the discussion as useless. The alternative you are proposing
has evolved as part of the discussion and the KIP has been updated a couple
of times.


> My argument for the 2nd approach is that it additionally allows user to use
> human readable cluster.id if they want to do it.


As discussed previously, you can also set the cluster id manually with the
existing KIP by setting the relevant znode. One could describe it as an
intentionally expert-level option instead of the more accessible broker
config (the broker level config does mean that you have to set it for every
broker, but that's probably fine since it's also the case for a few other
configs).

There are two major concern with this KIP:
>
> - The KIP says that it is a requirement for cluster.id to be immutable.
> Ismael suggests that the reading cluster.id from config doesn't meet this
> requirement. However, the current approach described in the KIP doesn't
> satisfy this requirement either. If user deletes the znode that stores
> cluster.id, either intentionally or by mistake, the cluster.id is lost
> forever. The KIP should be updated to reflect this and this requirement can
> not be used in the comparison between the different approaches.
>

I will update the KIP to make this point clearer. A couple of comments:

1. Immutability is a goal and the recommended usage, but you can work
around it via the expert-level znode update. You have much bigger problems
if you delete random znodes by mistake to be honest.
2. The cluster id is not lost forever since it can be trivially recovered
from the logs.

- One of the argument against reading cluster.id from config is that "unique
> and immutable auto-generated id + changeable human-readable name is a
> better overall solution". Sumit describes the long term plan to use
> readable tags as well. However, the KIP doesn't describe the design of
> using this readable name/tags. The KIP needs to provide more information
> about this plan if this is used to argue for the existing approach against
> an alternative.
>

I am not sure why this is so. After all, your suggestion of a broker config
is an addition to what is being proposed in the KIP. Why can't we evaluate
your suggestion at the same time as the resource tags proposal? What we are
proposing in the current KIP is a foundation that is needed in both cases.
What am I missing?

Ismael


Re: [DISCUSS] KIP-78: Cluster Id

2016-09-05 Thread Dong Lin
Hey Ismael,

Yes, user has the option to manually set the cluster.id by directly setting
the znode. But the KIP doesn't provide script for doing this. Unlike
reading cluster.id from broker, this approach doesn't allow you to persist
the cluster.id in case of cluster migration and znode deletion.

In case of zookeeper migration or znode deletion, you can read the
cluster.id from previous log. But log is not a good place to persist
anything you need because it is not persisted in any repository. Reading a
config value from log for recovery also appears adhoc.

I agree that in terms of implementation you can add the cluster.id config
value in a future KIP without affecting this KIP. But why couldn't we do it
altogether if it is not too hard and if this adds value on top of existing
proposal? I think we are encouraged to think beyond the existing design of
the KIP to see if it can serve its target use-case better, if the
improvement doesn't require too much additional work. Readability and
ability to persist cluster.id in config seems very useful to the use-case
described in the KIP.

I am just trying to contribute here by providing the review. If you think
the benefits above is trivial and doesn't have to be included in this KIP,
I am OK with that. Thanks for all the discussion.

Dong


On Mon, Sep 5, 2016 at 2:41 PM, Ismael Juma  wrote:

> Hi Dong,
>
> A few clarifications below.
>
> On Mon, Sep 5, 2016 at 7:21 PM, Dong Lin  wrote:
> >
> > I think you are saying that we can stop our discussion and follow simply
> > take a vote where the majority decides. I don't think this is a good way
> to
> > find the best design for a KIP and the discussion seems to be useless.
>
>
> I didn't mean that we should stop the discussion. I meant that if we can't
> reach agreement, it would make sense to try and get feedback from a wider
> group in the interest of making progress. As you said below, a large number
> of messages were exchanged by just 3 people. Even if we were in agreement,
> we would not be able to pass the KIP by ourselves. Also, I would certainly
> not classify the discussion as useless. The alternative you are proposing
> has evolved as part of the discussion and the KIP has been updated a couple
> of times.
>
>
> > My argument for the 2nd approach is that it additionally allows user to
> use
> > human readable cluster.id if they want to do it.
>
>
> As discussed previously, you can also set the cluster id manually with the
> existing KIP by setting the relevant znode. One could describe it as an
> intentionally expert-level option instead of the more accessible broker
> config (the broker level config does mean that you have to set it for every
> broker, but that's probably fine since it's also the case for a few other
> configs).
>
> There are two major concern with this KIP:
> >
> > - The KIP says that it is a requirement for cluster.id to be immutable.
> > Ismael suggests that the reading cluster.id from config doesn't meet
> this
> > requirement. However, the current approach described in the KIP doesn't
> > satisfy this requirement either. If user deletes the znode that stores
> > cluster.id, either intentionally or by mistake, the cluster.id is lost
> > forever. The KIP should be updated to reflect this and this requirement
> can
> > not be used in the comparison between the different approaches.
> >
>
> I will update the KIP to make this point clearer. A couple of comments:
>
> 1. Immutability is a goal and the recommended usage, but you can work
> around it via the expert-level znode update. You have much bigger problems
> if you delete random znodes by mistake to be honest.
> 2. The cluster id is not lost forever since it can be trivially recovered
> from the logs.
>
> - One of the argument against reading cluster.id from config is that
> "unique
> > and immutable auto-generated id + changeable human-readable name is a
> > better overall solution". Sumit describes the long term plan to use
> > readable tags as well. However, the KIP doesn't describe the design of
> > using this readable name/tags. The KIP needs to provide more information
> > about this plan if this is used to argue for the existing approach
> against
> > an alternative.
> >
>
> I am not sure why this is so. After all, your suggestion of a broker config
> is an addition to what is being proposed in the KIP. Why can't we evaluate
> your suggestion at the same time as the resource tags proposal? What we are
> proposing in the current KIP is a foundation that is needed in both cases.
> What am I missing?
>
> Ismael
>


Re: [DISCUSS] KIP-78: Cluster Id

2016-09-05 Thread Ismael Juma
Hi Dong,

Thanks for your reply. See inline.

On Mon, Sep 5, 2016 at 11:28 PM, Dong Lin  wrote:

> Yes, user has the option to manually set the cluster.id by directly
> setting
> the znode. But the KIP doesn't provide script for doing this.


That is correct. And intentionally, as discussed previously.


> Unlike reading cluster.id from broker, this approach doesn't allow you to
> persist
> the cluster.id in case of cluster migration and znode deletion.
>

Right, this is consistent with how we store the broker sequence id, leaders
and ACLs (if you use the built-in authorizer) solely in ZooKeeper.
Something that we can improve later (via a broker config or
meta.properties).

In case of zookeeper migration or znode deletion, you can read the
> cluster.id from previous log. But log is not a good place to persist
> anything you need because it is not persisted in any repository. Reading a
> config value from log for recovery also appears adhoc.
>

Yes, agreed. This is just a workaround in the catastrophic event that you
lose your ZooKeeper ensemble.

I agree that in terms of implementation you can add the cluster.id config
> value in a future KIP without affecting this KIP. But why couldn't we do it
> altogether if it is not too hard and if this adds value on top of existing
> proposal? I think we are encouraged to think beyond the existing design of
> the KIP to see if it can serve its target use-case better, if the
> improvement doesn't require too much additional work. Readability and
> ability to persist cluster.id in config seems very useful to the use-case
> described in the KIP.
>
> I am just trying to contribute here by providing the review. If you think
> the benefits above is trivial and doesn't have to be included in this KIP,
> I am OK with that. Thanks for all the discussion.
>

We definitely believe in the usefulness of human readable cluster ids. We
just think that it would be confusing to have 2 ways of doing it and we
think that being able to freely change the human readable cluster id
without negative consequences is a worthy goal (and hence the idea of an
auto-generated one + a changeable human readable one). At the same time, we
would like to ship the foundation (the current KIP) sooner than it would
take us to work out all the details of phase 2, discuss them, reach
agreement and implement them. And since the foundation is needed in either
case, it seemed like a win-win to do it incrementally.

If the community agrees and we ship this KIP first, then I very much hope
you will be part of the discussion on the KIP for the human readable id.

Thanks for sticking with us through this long mailing list thread by the
way. :)

Ismael


[jira] [Assigned] (KAFKA-4116) Specifying 0.0.0.0 in "listeners" doesn't work

2016-09-05 Thread Yuto Kawamura (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuto Kawamura reassigned KAFKA-4116:


Assignee: Gwen Shapira  (was: Yuto Kawamura)

> Specifying 0.0.0.0 in "listeners" doesn't work
> --
>
> Key: KAFKA-4116
> URL: https://issues.apache.org/jira/browse/KAFKA-4116
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1, 0.10.0.1
>Reporter: Yuto Kawamura
>Assignee: Gwen Shapira
> Fix For: 0.10.0.2
>
>
> The document of {{listeners}} says:
> "Specify hostname as 0.0.0.0 to bind to all interfaces."
> However when I give config such as below, a started kafka broker can't join 
> the cluster due to invalid address advertised on zk.
> {code}
> listeners=PLAINTEXT://0.0.0.0:9092
> # advertised.listeners=
> {code}
> This is because of:
> - {{advertised.listeners}} which is used as an address to publish on zk 
> defaults to {{listeners}}
> - KafkaHealthcheck#register isn't considering the host "0.0.0.0" as a special 
> case : 
> https://github.com/apache/kafka/blob/8f3462552fa4d6a6d70a837c2ef7439bba512657/core/src/main/scala/kafka/server/KafkaHealthcheck.scala#L60-L61
> h3. Proof
> Test environment:
> - kafka-broker version 0.10.1.0-SNAPSHOT(build from trunk)
> - Brokers HOST-A, HOST-B, HOST-C
> - Controller: HOST-A
> - topic-A has 3 replicas, 3 partitions
> Update HOST-B's server.properties with updating listeners to below and 
> restart the broker.
> {code}
> listeners=PLAINTEXT://0.0.0.0:9092
> {code}
> Then HOST-B registeres it's broker info to ZK path {{/brokers/ids/2}}, but 
> "0.0.0.0" is used as it's host:
> {code}
> [zk: ZKHOST1:2181,ZKHOST2:2181,ZKHOST3:2181/kafka-test(CONNECTED) 8] get 
> /brokers/ids/2
> {"jmx_port":12345,"timestamp":"1472796372181","endpoints":["PLAINTEXT://0.0.0.0:9092"],"host":"0.0.0.0","version":3,"port":9092}
> {code}
> Controller tries to send an request to the above address but of course it 
> will never reach to the HOST-B.
> controller.log:
> {code}
> [2016-09-02 15:06:12,206] INFO [Controller-1-to-broker-2-send-thread], 
> Controller 1 connected to 0.0.0.0:9092 (id: 2 rack: null) for sending state 
> change requests (kafka.controller.RequestSendThread)
> {code}
> I'm guessing maybe controller sending a request to itself(kafka broker 
> working on the same instance), as calling connect("0.0.0.0") results to 
> connect to localhost, which sounds scary but havn't digged into.
> So the ISR won't recovered even through a broker starts up.
> {code}
> ./kafka-topics.sh ... --describe --topic topic-A
> Topic:topic-A   PartitionCount:3ReplicationFactor:3 
> Configs:retention.ms=8640,min.insync.replicas=2
> Topic: topic-A  Partition: 0Leader: 3   Replicas: 3,2,1 Isr: 
> 1,3
> Topic: topic-A  Partition: 1Leader: 1   Replicas: 1,3,2 Isr: 
> 1,3
> Topic: topic-A  Partition: 2Leader: 1   Replicas: 2,1,3 Isr: 
> 1,3
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4116) Specifying 0.0.0.0 in "listeners" doesn't work

2016-09-05 Thread Yuto Kawamura (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuto Kawamura updated KAFKA-4116:
-
Assignee: Yuto Kawamura  (was: Gwen Shapira)

> Specifying 0.0.0.0 in "listeners" doesn't work
> --
>
> Key: KAFKA-4116
> URL: https://issues.apache.org/jira/browse/KAFKA-4116
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1, 0.10.0.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.0.2
>
>
> The document of {{listeners}} says:
> "Specify hostname as 0.0.0.0 to bind to all interfaces."
> However when I give config such as below, a started kafka broker can't join 
> the cluster due to invalid address advertised on zk.
> {code}
> listeners=PLAINTEXT://0.0.0.0:9092
> # advertised.listeners=
> {code}
> This is because of:
> - {{advertised.listeners}} which is used as an address to publish on zk 
> defaults to {{listeners}}
> - KafkaHealthcheck#register isn't considering the host "0.0.0.0" as a special 
> case : 
> https://github.com/apache/kafka/blob/8f3462552fa4d6a6d70a837c2ef7439bba512657/core/src/main/scala/kafka/server/KafkaHealthcheck.scala#L60-L61
> h3. Proof
> Test environment:
> - kafka-broker version 0.10.1.0-SNAPSHOT(build from trunk)
> - Brokers HOST-A, HOST-B, HOST-C
> - Controller: HOST-A
> - topic-A has 3 replicas, 3 partitions
> Update HOST-B's server.properties with updating listeners to below and 
> restart the broker.
> {code}
> listeners=PLAINTEXT://0.0.0.0:9092
> {code}
> Then HOST-B registeres it's broker info to ZK path {{/brokers/ids/2}}, but 
> "0.0.0.0" is used as it's host:
> {code}
> [zk: ZKHOST1:2181,ZKHOST2:2181,ZKHOST3:2181/kafka-test(CONNECTED) 8] get 
> /brokers/ids/2
> {"jmx_port":12345,"timestamp":"1472796372181","endpoints":["PLAINTEXT://0.0.0.0:9092"],"host":"0.0.0.0","version":3,"port":9092}
> {code}
> Controller tries to send an request to the above address but of course it 
> will never reach to the HOST-B.
> controller.log:
> {code}
> [2016-09-02 15:06:12,206] INFO [Controller-1-to-broker-2-send-thread], 
> Controller 1 connected to 0.0.0.0:9092 (id: 2 rack: null) for sending state 
> change requests (kafka.controller.RequestSendThread)
> {code}
> I'm guessing maybe controller sending a request to itself(kafka broker 
> working on the same instance), as calling connect("0.0.0.0") results to 
> connect to localhost, which sounds scary but havn't digged into.
> So the ISR won't recovered even through a broker starts up.
> {code}
> ./kafka-topics.sh ... --describe --topic topic-A
> Topic:topic-A   PartitionCount:3ReplicationFactor:3 
> Configs:retention.ms=8640,min.insync.replicas=2
> Topic: topic-A  Partition: 0Leader: 3   Replicas: 3,2,1 Isr: 
> 1,3
> Topic: topic-A  Partition: 1Leader: 1   Replicas: 1,3,2 Isr: 
> 1,3
> Topic: topic-A  Partition: 2Leader: 1   Replicas: 2,1,3 Isr: 
> 1,3
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: WARN log message flooding broker logs for a pretty typical SSL setup

2016-09-05 Thread Jaikiran Pai
Thanks Ismael, I'll raise a PR for this. As a process, is there a JIRA 
that's expected to be filed for this before I raise a PR or would this 
be OK without a JIRA?


-Jaikiran
On Monday 05 September 2016 03:55 PM, Ismael Juma wrote:

Hi Jaikiran,

I agree that this is a valid configuration and the log level seems too high
given that. The original motivation is explained in the PR:

https://github.com/apache/kafka/pull/155/files#diff-fce430ae21a0c98d82da6d4aa551f824L603

That is, help people figure out if client authentication was not setup
correctly, but it seems like a better way to do that is to set
`ssl.client.auth=required`. So I'd, personally, be fine with reducing the
log level to info or debug.

Ismael

On Sun, Sep 4, 2016 at 3:01 PM, Jaikiran Pai 
wrote:


We just started enabling SSL for our Kafka brokers and (Java) clients and
among some of the issues we are running into, one of them is the flooding
of the server/broker Kafka logs where we are seeing these messages:

[2016-09-02 08:07:13,773] WARN SSL peer is not authenticated, returning
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,710] WARN SSL peer is not authenticated, returning
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,712] WARN SSL peer is not authenticated, returning
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)


They just keep going on and on. In our SSL setup, we have the broker
configured with the keystore and the Java clients have been configured with
a proper truststore and all works fine except for these messages flooding
the logs. We don't have any ACLs setup nor have we enabled client auth
check.

Looking at the code which generates this WARN message
https://github.com/apache/kafka/blob/trunk/clients/src/main/
java/org/apache/kafka/common/network/SslTransportLayer.java#L638 and the
fact that the setup we have (where we just enable server/broker cert
validation) is, IMO, a valid scenario and not some exceptional/incorrect
setup issue, I think this log message is something that can be removed from
the code (or at least logged at a very lower level given the frequency at
which this gets logged)

Any thoughts on this?

It's a pretty straightforward change and if this change is something that
sounds right, I can go ahead and submit a PR.

P.S: This is both on 0.9.0.1 and latest 0.10.0.1.

-Jaikiran






Re: Plans to improve SSL performance in Kafka, for 0.10.x?

2016-09-05 Thread Jaikiran Pai

Hi Todd,


On Sunday 04 September 2016 10:58 PM, Todd Palino wrote:

We've been using SSL for produce traffic (mirror makers only right now, but
that's a very large percentage of traffic for us), and we're in the process
of turning it on for inter broker traffic as well. Our experience is that
this does not present a significant amount of overhead to the brokers.
Specifically with switching over the IBP, we were expecting a lot more of a
hit, and it really only ended up being something like a 5% increase in
system load, and no reduction in the cluster capacity, in our test cluster.


The slowness we are noticing is on the client JVM which talk to the 
broker. The clients are using (new) Java Kafka producer and consumer 
APIs and the timings that we are seeing in the production and 
consumption of these messages is what is different with and without SSL.




Note that this relies on the fix in KAFKA-4050 and switching the PRNG to
SHA1PRNG.


Thanks, I hadn't noticed that JIRA. I'll make sure we use it in our 
setup. However, right now, (intentionally) we have single broker within 
the setup and based on the JIRA details, I believe this one won't affect 
us for now.





Right now, we're specifically avoiding moving consume traffic to SSL, due
to the zero copy send issue. Now I've been told (but I have not
investigated) that OpenSSL can solve this. It would probably be a good use
of time to look into that further.


Unfortunately, the way our application is written, we don't have the 
ability to enable SSL for producers and disable it for consumers. 
Changing the application to support something like that is doable, but I 
will have to make sure that this is one of the reasons for the slowness.



Thanks for all your inputs. I was trying to avoid coming up with a 
reproducer which closely matches our application usage scenario (given 
our tight delivery timelines for the product), but from what you say and 
a reply Tom Crayford in this thread, I am starting to feel that not 
everyone is affected to the extent we are. I will see if I can narrow to 
this down to something more concrete.


-Jaikiran






That said, switching the message format to the newer option (KIP-31 I
believe?) will result in the brokers not needing to recompress message
batches that are produced. This should result in a significant reduction in
CPU usage, which may offset the cost of SSL. We haven't had a chance to
fully investigate this, however, as changing that config depends on the
clients being updated to support the new format.

-Todd

On Sunday, September 4, 2016, Jaikiran Pai  wrote:


We are using 0.10.0.1 of Kafka and (Java) client libraries. We recently
decided to start using SSL for Kafka communication between broker and
clients. Right now, we have a pretty basic setup with just 1 broker with
SSL keystore setup and the Java client(s) communicate using the
Producer/Consumer APIs against this single broker. There's no client auth
(intentionally) right now. We also have plain text enabled for the initial
testing.

What we have noticed is that the consumer/producer performance when SSL is
enabled is noticeably poor when compared to plain text. I understand that
there are expected to be performance impacts when SSL is enabled but the
order of magnitude is too high and in fact it shows up in a very noticeable
fashion in our product. I do have the numbers, but I haven't been able to
narrow it down yet (because there's nothing that stands out, except that
it's slow). Our application code is exactly the same between non-SSL and
SSL usage.

Furthermore, I'm aware of this specific JIRA in Kafka
https://issues.apache.org/jira/browse/KAFKA-2561 which acknowledges a
similar issue. So what I would like to know is, in context of Kafka 0.10.x
releases and Java 8 support, are there any timelines that the dev team is
looking for in terms of improving this performance issue (which I believe
requires usage of OpenSSL or other non-JDK implementations of SSLEngine)?
We would like to go for GA of our product in the next couple of months and
in order to do that, we do plan to have Kafka over SSL working with
reasonably good performance, but the current performance isn't promising.
Expecting this to be fixed in the next couple of months and have it
available in 0.10.x is probably too much to expect, but if we know the
plans around this, we should be able to come up with a plan of our own for
our product.


-Jaikiran







Re: Plans to improve SSL performance in Kafka, for 0.10.x?

2016-09-05 Thread Jaikiran Pai

Hi Tom,

On Monday 05 September 2016 06:32 PM, Tom Crayford wrote:

We've been using SSL quite successfully on a few thousand production
clusters, and have done heavy performance testing, and as far as we can
see, it doesn't impact the cluster's capacity. The only case I could see
that happening is if you have a *lot* of connection churn, but that isn't
very typical for most Kafka usage.


We have a single broker and the clients to this broker are the Java 
producers and consumers across different Java VMs. The producers are one 
per app (as recommended by Kafka) and consumers are around 3 per app 
(all pointing to the same broker, obviously). I don't think this adds to 
any kind of connection churn. Furthermore, the slowness we are seeing is 
in the client JVMs and no real noticeable issues on the broker itself.





That's for both produce and consume traffic. We do see increase amounts of
garbage collection under high consumer load with SSL enabled, but nothing
too horrible (brokers are well below the SLA of GC to keep their zookeeper
sessions alive).


This GC you notice, is it on the broker(s) or on the clients which hosts 
the producer/consumers? Either way, this is a good hint and something I 
hadn't considered in my setup. I'll see if I can get some GC 
logs/metrics on our setup for this. Thanks for the input.



-Jaikiran




Thanks

Tom Crayford
Heroku Kafka

On Sun, Sep 4, 2016 at 6:28 PM, Todd Palino  wrote:


We've been using SSL for produce traffic (mirror makers only right now, but
that's a very large percentage of traffic for us), and we're in the process
of turning it on for inter broker traffic as well. Our experience is that
this does not present a significant amount of overhead to the brokers.
Specifically with switching over the IBP, we were expecting a lot more of a
hit, and it really only ended up being something like a 5% increase in
system load, and no reduction in the cluster capacity, in our test cluster.
Note that this relies on the fix in KAFKA-4050 and switching the PRNG to
SHA1PRNG.

Right now, we're specifically avoiding moving consume traffic to SSL, due
to the zero copy send issue. Now I've been told (but I have not
investigated) that OpenSSL can solve this. It would probably be a good use
of time to look into that further.

That said, switching the message format to the newer option (KIP-31 I
believe?) will result in the brokers not needing to recompress message
batches that are produced. This should result in a significant reduction in
CPU usage, which may offset the cost of SSL. We haven't had a chance to
fully investigate this, however, as changing that config depends on the
clients being updated to support the new format.

-Todd

On Sunday, September 4, 2016, Jaikiran Pai 
wrote:


We are using 0.10.0.1 of Kafka and (Java) client libraries. We recently
decided to start using SSL for Kafka communication between broker and
clients. Right now, we have a pretty basic setup with just 1 broker with
SSL keystore setup and the Java client(s) communicate using the
Producer/Consumer APIs against this single broker. There's no client auth
(intentionally) right now. We also have plain text enabled for the

initial

testing.

What we have noticed is that the consumer/producer performance when SSL

is

enabled is noticeably poor when compared to plain text. I understand that
there are expected to be performance impacts when SSL is enabled but the
order of magnitude is too high and in fact it shows up in a very

noticeable

fashion in our product. I do have the numbers, but I haven't been able to
narrow it down yet (because there's nothing that stands out, except that
it's slow). Our application code is exactly the same between non-SSL and
SSL usage.

Furthermore, I'm aware of this specific JIRA in Kafka
https://issues.apache.org/jira/browse/KAFKA-2561 which acknowledges a
similar issue. So what I would like to know is, in context of Kafka

0.10.x

releases and Java 8 support, are there any timelines that the dev team is
looking for in terms of improving this performance issue (which I believe
requires usage of OpenSSL or other non-JDK implementations of SSLEngine)?
We would like to go for GA of our product in the next couple of months

and

in order to do that, we do plan to have Kafka over SSL working with
reasonably good performance, but the current performance isn't promising.
Expecting this to be fixed in the next couple of months and have it
available in 0.10.x is probably too much to expect, but if we know the
plans around this, we should be able to come up with a plan of our own

for

our product.


-Jaikiran



--
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino





Re: Plans to improve SSL performance in Kafka, for 0.10.x?

2016-09-05 Thread Jaikiran Pai

On Tuesday 06 September 2016 09:42 AM, Jaikiran Pai wrote:

Hi Todd,


Note that this relies on the fix in KAFKA-4050 and switching the PRNG to
SHA1PRNG.


Thanks, I hadn't noticed that JIRA. I'll make sure we use it in our 
setup. However, right now, (intentionally) we have single broker 
within the setup and based on the JIRA details, I believe this one 
won't affect us for now.


I read that JIRA again and saw the stacktrace. I might be wrong when I 
said this probably doesn't affect us given that the code path appears to 
the same for other SSL interactions. I'm going to introduce this patch 
(or in fact the latest 0.10 branch) in our setup and see how it goes.


-Jaikiran







Right now, we're specifically avoiding moving consume traffic to SSL, 
due

to the zero copy send issue. Now I've been told (but I have not
investigated) that OpenSSL can solve this. It would probably be a 
good use

of time to look into that further.


Unfortunately, the way our application is written, we don't have the 
ability to enable SSL for producers and disable it for consumers. 
Changing the application to support something like that is doable, but 
I will have to make sure that this is one of the reasons for the 
slowness.



Thanks for all your inputs. I was trying to avoid coming up with a 
reproducer which closely matches our application usage scenario (given 
our tight delivery timelines for the product), but from what you say 
and a reply Tom Crayford in this thread, I am starting to feel that 
not everyone is affected to the extent we are. I will see if I can 
narrow to this down to something more concrete.


-Jaikiran






That said, switching the message format to the newer option (KIP-31 I
believe?) will result in the brokers not needing to recompress message
batches that are produced. This should result in a significant 
reduction in

CPU usage, which may offset the cost of SSL. We haven't had a chance to
fully investigate this, however, as changing that config depends on the
clients being updated to support the new format.

-Todd

On Sunday, September 4, 2016, Jaikiran Pai  
wrote:



We are using 0.10.0.1 of Kafka and (Java) client libraries. We recently
decided to start using SSL for Kafka communication between broker and
clients. Right now, we have a pretty basic setup with just 1 broker 
with

SSL keystore setup and the Java client(s) communicate using the
Producer/Consumer APIs against this single broker. There's no client 
auth
(intentionally) right now. We also have plain text enabled for the 
initial

testing.

What we have noticed is that the consumer/producer performance when 
SSL is
enabled is noticeably poor when compared to plain text. I understand 
that
there are expected to be performance impacts when SSL is enabled but 
the
order of magnitude is too high and in fact it shows up in a very 
noticeable
fashion in our product. I do have the numbers, but I haven't been 
able to
narrow it down yet (because there's nothing that stands out, except 
that
it's slow). Our application code is exactly the same between non-SSL 
and

SSL usage.

Furthermore, I'm aware of this specific JIRA in Kafka
https://issues.apache.org/jira/browse/KAFKA-2561 which acknowledges a
similar issue. So what I would like to know is, in context of Kafka 
0.10.x
releases and Java 8 support, are there any timelines that the dev 
team is
looking for in terms of improving this performance issue (which I 
believe
requires usage of OpenSSL or other non-JDK implementations of 
SSLEngine)?
We would like to go for GA of our product in the next couple of 
months and

in order to do that, we do plan to have Kafka over SSL working with
reasonably good performance, but the current performance isn't 
promising.

Expecting this to be fixed in the next couple of months and have it
available in 0.10.x is probably too much to expect, but if we know the
plans around this, we should be able to come up with a plan of our 
own for

our product.


-Jaikiran









Re: WARN log message flooding broker logs for a pretty typical SSL setup

2016-09-05 Thread Manikumar Reddy
We don't need JIRA for minor PRs. Just prefix "MINOR:" to PR title.

On Tue, Sep 6, 2016 at 9:16 AM, Jaikiran Pai 
wrote:

> Thanks Ismael, I'll raise a PR for this. As a process, is there a JIRA
> that's expected to be filed for this before I raise a PR or would this be
> OK without a JIRA?
>
> -Jaikiran
>
> On Monday 05 September 2016 03:55 PM, Ismael Juma wrote:
>
>> Hi Jaikiran,
>>
>> I agree that this is a valid configuration and the log level seems too
>> high
>> given that. The original motivation is explained in the PR:
>>
>> https://github.com/apache/kafka/pull/155/files#diff-fce430ae
>> 21a0c98d82da6d4aa551f824L603
>>
>> That is, help people figure out if client authentication was not setup
>> correctly, but it seems like a better way to do that is to set
>> `ssl.client.auth=required`. So I'd, personally, be fine with reducing the
>> log level to info or debug.
>>
>> Ismael
>>
>> On Sun, Sep 4, 2016 at 3:01 PM, Jaikiran Pai 
>> wrote:
>>
>> We just started enabling SSL for our Kafka brokers and (Java) clients and
>>> among some of the issues we are running into, one of them is the flooding
>>> of the server/broker Kafka logs where we are seeing these messages:
>>>
>>> [2016-09-02 08:07:13,773] WARN SSL peer is not authenticated, returning
>>> ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
>>> [2016-09-02 08:07:15,710] WARN SSL peer is not authenticated, returning
>>> ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
>>> [2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning
>>> ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
>>> [2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning
>>> ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
>>> [2016-09-02 08:07:15,712] WARN SSL peer is not authenticated, returning
>>> ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
>>> 
>>>
>>> They just keep going on and on. In our SSL setup, we have the broker
>>> configured with the keystore and the Java clients have been configured
>>> with
>>> a proper truststore and all works fine except for these messages flooding
>>> the logs. We don't have any ACLs setup nor have we enabled client auth
>>> check.
>>>
>>> Looking at the code which generates this WARN message
>>> https://github.com/apache/kafka/blob/trunk/clients/src/main/
>>> java/org/apache/kafka/common/network/SslTransportLayer.java#L638 and the
>>> fact that the setup we have (where we just enable server/broker cert
>>> validation) is, IMO, a valid scenario and not some exceptional/incorrect
>>> setup issue, I think this log message is something that can be removed
>>> from
>>> the code (or at least logged at a very lower level given the frequency at
>>> which this gets logged)
>>>
>>> Any thoughts on this?
>>>
>>> It's a pretty straightforward change and if this change is something that
>>> sounds right, I can go ahead and submit a PR.
>>>
>>> P.S: This is both on 0.9.0.1 and latest 0.10.0.1.
>>>
>>> -Jaikiran
>>>
>>>
>>>
>


[GitHub] kafka pull request #1825: MINOR: Reduce the log level when the peer isn't au...

2016-09-05 Thread jaikiran
GitHub user jaikiran opened a pull request:

https://github.com/apache/kafka/pull/1825

MINOR: Reduce the log level when the peer isn't authenticated but is using 
SSL

The commit here changes the log level of a log message from WARN to DEBUG. 
As noted in the mail discussion here 
https://www.mail-archive.com/dev@kafka.apache.org/msg56035.html, in a pretty 
straightforward/typical and valid setup, the broker logs get flooded with the 
following message:

   [2016-09-02 08:07:13,773] WARN SSL peer is not 
authenticated, returning ANONYMOUS instead 
(org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,710] WARN SSL peer is not authenticated, 
returning ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, 
returning ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, 
returning ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,712] WARN SSL peer is not authenticated, 
returning ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer) 



and it goes on forever. 

Personally, I would like to remove this log message altogether for two 
reasons:

- It's a valid case for the peer to be not authenticated but still using 
SSL and the code rightly handles it to return anonymous principal
- The fact that this method gets called way too frequently and irrespective 
of what log level it gets logged at, it will end up flooding the log if that 
level is enabled.

Having said that I don't know if there will be an agreement on removing 
this log message altogether, hence just lowering the level for now.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jaikiran/kafka ssl-log-level

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1825.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1825


commit 72919ce832ebf4437893993e16f3003bcd6eb941
Author: Jaikiran Pai 
Date:   2016-09-06T05:23:44Z

Reduce the log level when the peer isn't authenticated but is using SSL 
(which is a valid case)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: WARN log message flooding broker logs for a pretty typical SSL setup

2016-09-05 Thread Jaikiran Pai

Thank you. Just opened one https://github.com/apache/kafka/pull/1825

-Jaikiran

On Tuesday 06 September 2016 10:12 AM, Manikumar Reddy wrote:

We don't need JIRA for minor PRs. Just prefix "MINOR:" to PR title.

On Tue, Sep 6, 2016 at 9:16 AM, Jaikiran Pai 
wrote:


Thanks Ismael, I'll raise a PR for this. As a process, is there a JIRA
that's expected to be filed for this before I raise a PR or would this be
OK without a JIRA?

-Jaikiran

On Monday 05 September 2016 03:55 PM, Ismael Juma wrote:


Hi Jaikiran,

I agree that this is a valid configuration and the log level seems too
high
given that. The original motivation is explained in the PR:

https://github.com/apache/kafka/pull/155/files#diff-fce430ae
21a0c98d82da6d4aa551f824L603

That is, help people figure out if client authentication was not setup
correctly, but it seems like a better way to do that is to set
`ssl.client.auth=required`. So I'd, personally, be fine with reducing the
log level to info or debug.

Ismael

On Sun, Sep 4, 2016 at 3:01 PM, Jaikiran Pai 
wrote:

We just started enabling SSL for our Kafka brokers and (Java) clients and

among some of the issues we are running into, one of them is the flooding
of the server/broker Kafka logs where we are seeing these messages:

[2016-09-02 08:07:13,773] WARN SSL peer is not authenticated, returning
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,710] WARN SSL peer is not authenticated, returning
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,712] WARN SSL peer is not authenticated, returning
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)


They just keep going on and on. In our SSL setup, we have the broker
configured with the keystore and the Java clients have been configured
with
a proper truststore and all works fine except for these messages flooding
the logs. We don't have any ACLs setup nor have we enabled client auth
check.

Looking at the code which generates this WARN message
https://github.com/apache/kafka/blob/trunk/clients/src/main/
java/org/apache/kafka/common/network/SslTransportLayer.java#L638 and the
fact that the setup we have (where we just enable server/broker cert
validation) is, IMO, a valid scenario and not some exceptional/incorrect
setup issue, I think this log message is something that can be removed
from
the code (or at least logged at a very lower level given the frequency at
which this gets logged)

Any thoughts on this?

It's a pretty straightforward change and if this change is something that
sounds right, I can go ahead and submit a PR.

P.S: This is both on 0.9.0.1 and latest 0.10.0.1.

-Jaikiran







[jira] [Commented] (KAFKA-4126) No relevant log when the topic is non-existent

2016-09-05 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15466496#comment-15466496
 ] 

Manikumar Reddy commented on KAFKA-4126:


Currently producer logs below warn message.  May be we can add a debug log on 
broker side.
{code}
[2016-09-06 11:10:33,916] WARN Error while fetching metadata with correlation 
id 291 : {UNKNOWN=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient)
{code}

> No relevant log when the topic is non-existent
> --
>
> Key: KAFKA-4126
> URL: https://issues.apache.org/jira/browse/KAFKA-4126
> Project: Kafka
>  Issue Type: Bug
>Reporter: Balázs Barnabás
>Priority: Minor
>
> When a producer sends a ProducerRecord into a Kafka topic that doesn't 
> existst, there is no relevant debug/error log that points out the error.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4109) kafka client send msg exception

2016-09-05 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15466570#comment-15466570
 ] 

Manikumar Reddy commented on KAFKA-4109:


Enable producer debug logs to get root cause of the TimeoutException.

> kafka client send msg exception
> ---
>
> Key: KAFKA-4109
> URL: https://issues.apache.org/jira/browse/KAFKA-4109
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.0.0
> Environment: java8
> kafka cluster
>Reporter: frank
>   Original Estimate: 5m
>  Remaining Estimate: 5m
>
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Batch Expired 
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
>  
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
>  
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>  
>   at 
> com.longsheng.basicCollect.kafka.KafkaProducer.publishMessage(KafkaProducer.java:44)
>  
>   at 
> com.longsheng.basicCollect.crawler.processor.dzwbigdata.ParentBasic.publish(ParentBasic.java:60)
>  
>   at 
> com.longsheng.basicCollect.crawler.processor.dzwbigdata.ParentBasic.parseIncJson(ParentBasic.java:119)
>  
>   at 
> com.longsheng.basicCollect.crawler.processor.dzwbigdata.coke.price.SecondPrice.collectData(SecondPrice.java:41)
>  
>   at 
> com.longsheng.basicCollect.crawler.processor.dzwbigdata.coke.price.SecondPrice.process(SecondPrice.java:49)
>  
>   at 
> com.longsheng.basicCollect.crawler.processor.dzwbigdata.DzwProcessor.process(DzwProcessor.java:33)
>  
>   at 
> com.longsheng.basicCollect.timer.CollectDzwBigData.execute(CollectDzwBigData.java:14)
>  
>   at org.quartz.core.JobRunShell.run(JobRunShell.java:202) 
>   at 
> org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) 
> Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired
> the exception is arbitrarily!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-79 - ListOffsetRequest v1 and offsetForTime() method in new consumer.

2016-09-05 Thread Magnus Edenhill
Good write-up Qin, the API looks promising.

I have one comment:

2016-09-03 5:20 GMT+02:00 Becket Qin :

> The currently offsetsForTimes() API obviously does not support querying
> multiple timestamps for the same partition. It doesn't seems a feature for
> ListOffsetRequest v0 either (sounds more like a bug). My intuition is that
> it's a rare use case. Given it does not exist before and we don't see a
> strong need from the community either, maybe it is better to keep it simple
> for ListOffsetRequest v1. We can add it later if it turns out to be a
> useful feature (that may need a interface change, but I honestly do not
> think people would frequently query many different timestamps for the same
> partition)
>

I argue that the current behaviour of OffsetRequest with regards to
duplicate partitions is a bug
and think it would be a mistake to move the same semantics over to thew new
ListOffset API.
One use case is that an application may want to know the offset range
between two timestamps,
e.g., for reprocessing, batching, searching, etc.


Thanks,
Magnus



>
> Have a good long weekend!
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Fri, Sep 2, 2016 at 6:10 PM, Ismael Juma  wrote:
>
> > Thanks for the proposal Becket. Looks good overall, a few comments:
> >
> > ListOffsetResponse => [TopicName [PartitionOffsets]]
> > >   PartitionOffsets => Partition ErrorCode Timestamp [Offset]
> > >   Partition => int32
> > >   ErrorCode => int16
> > >   Timestamp => int64
> > >   Offset => int
> >
> >
> > It should be int64 for `Offset` right?
> >
> > Implementation wise, we will migrate to o.a.k.common.requests.
> > ListOffsetRequest
> > > class on the broker side.
> >
> >
> > Could you clarify what you mean here? We already
> > use o.a.k.common.requests.ListOffsetRequest in KafkaApis.
> >
> > long offset = consumer.offsetForTime(Collections.singletonMap(
> > topicPartition,
> > > targetTime)).offset;
> >
> >
> > The result of `offsetForTime` is a Map, so we can't just call `offset` on
> > it. You probably meant something like:
> >
> > long offset = consumer.offsetForTime(Collections.singletonMap(
> > topicPartition,
> > targetTime)).get(topicPartition).offset;
> >
> > Test searchByTimestamp with CreateTime and LogAppendTime
> > >
> >
> > Do you mean `Test offsetForTime`?
> >
> > And:
> >
> > 1. In KAFKA-1588, the following issue was described "When performing an
> > OffsetRequest, if you request the same topic and partition combination
> in a
> > single request more than once (for example, if you want to get both the
> > head and tail offsets for a partition in the same request), you will get
> a
> > response for both, but they will be the same offset". Will the new
> request
> > version support the use case where multiple timestamps are passed for the
> > same topic partition? And if we do support it at the protocol level, do
> we
> > also want to support it at the API level or do we think the additional
> > complexity is not worth it?
> >
> > 2. Is `offsetForTime` the right method name given that we are getting
> > multiple offsets? Maybe it should be `offsetsForTimes` or something like
> > that.
> >
> > Ismael
> >
> > On Wed, Aug 31, 2016 at 4:38 AM, Becket Qin 
> wrote:
> >
> > > Hi Kafka devs,
> > >
> > > I created KIP-79 to allow consumer to precisely query the offsets based
> > on
> > > timestamp.
> > >
> > > In short we propose to :
> > > 1. add a ListOffsetRequest/ListOffsetResponse v1, and
> > > 2. add an offsetForTime() method in new consumer.
> > >
> > > The KIP wiki is the following:
> > > https://cwiki.apache.org/confluence/pages/viewpage.
> > action?pageId=65868090
> > >
> > > Comments are welcome.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> >
>


[jira] [Commented] (KAFKA-4111) broker compress data of certain size instead on a produce request

2016-09-05 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15466584#comment-15466584
 ] 

Manikumar Reddy commented on KAFKA-4111:


"compression.type" broker config property is applicable to all the messages. We 
don't have option to compress messages
 based on certain message size.  

We can use topic level config property to set "compression.type"  to a 
particular topic.
or  you can use multiple producers with different compression types.

> broker compress data of certain size instead on a produce request
> -
>
> Key: KAFKA-4111
> URL: https://issues.apache.org/jira/browse/KAFKA-4111
> Project: Kafka
>  Issue Type: Improvement
>  Components: compression
>Affects Versions: 0.10.0.1
>Reporter: julien1987
>
> When "compression.type" is set on broker config, broker compress data on 
> every produce request. But on our sences, produce requst is very many, and 
> data of every request is not so much. So compression result is not good. Can 
> Broker compress data of every certain size from many produce requests?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)