Re: [DISCUSS] KIP-448: Add State Stores Unit Test Support to Kafka Streams Test Utils

2019-04-09 Thread Bruno Cadonna
Hi Yishun,

Thank you for the KIP.

I have a couple of comments:

1. Could you please add an example to the KIP that demonstrates how the
mocks should be used in a test?

2. I am wondering, whether the MockKeyValueStore needs to be backed by an
actual KeyValueStore (in your KIP InMemoryKeyValueStore). Would it not
suffice to provide the mock with the entries that it has to check in case
of input operation like put() and with the entries it has to return in case
of an output operation like get()? In my opinion, a mock should have as
little and as simple code as possible. A unit test should depend as little
as possible from productive code that it does not explicitly test.

3. I would be interested in the arguments against using a well-established
and well-tested mock framework like EasyMock. If there are good arguments,
they should be listed under 'Rejected Alternatives'.

3. What is the purpose of the parameter 'time' in MockStoreFactory?

Best,
Bruno

On Tue, Apr 9, 2019 at 11:29 AM Sophie Blee-Goldman 
wrote:

> Hi Yishun, thanks for the KIP! I have a few initial questions/comments:
>
> 1) It may be useful to capture the iterator results as well (eg with a
> MockIterator that wraps the underlying iterator and records the same way
> the MockStore wraps/records the underlying store)
>
> 2) a. Where is the "persistent" variable coming from or being used? It
> seems the MockKeyValueStore accepts it in the constructor, but only the
> name parameter is passed when constructing a new MockKeyValueStore in
> build() ... also, if we extend InMemoryXXXStore shouldn't this always be
> false?
> b. Is the idea to wrap an in-memory store for each type (key-value,
> session, etc)? We don't (yet) offer an in-memory version of the session
> store although it is in the works, so this will be possible -- I am more
> wondering if it makes sense to decide this for the user or to allow them to
> choose between in-memory or rocksDB by setting "persistent"
>
> 3) I'm wondering if users might want to be able to plug in their own custom
> stores as the underlying backend...should we support this as well? WDYT?
>
> 4) We probably want to make these stores available through the public
> test-utils package (maybe not the stores themselves which should be
> internal, but should there be some kind of public API that gives access to
> them?)
>
> Cheers,
> Sophie
>
> On Tue, Apr 9, 2019 at 9:19 AM Yishun Guan  wrote:
>
> > Bumping this up again, thanks!
> >
> > On Fri, Apr 5, 2019, 14:36 Yishun Guan  wrote:
> >
> > > Hi, bumping this up again. Thanks!
> > >
> > > On Tue, Apr 2, 2019, 13:07 Yishun Guan  wrote:
> > >
> > >> Hi All,
> > >>
> > >> I like to start a discussion on KIP-448
> > >> (https://cwiki.apache.org/confluence/x/SAeZBg). It is about adding
> > >> Mock state stores and relevant components for testing purposes.
> > >>
> > >> Here is the JIRA: https://issues.apache.org/jira/browse/KAFKA-6460
> > >>
> > >> This is a rough KIP draft, review and comment are appreciated. It
> > >> seems to be tricky and some requirements and details are still needed
> > >> to be discussed.
> > >>
> > >> Thanks,
> > >> Yishun
> > >>
> > >
> >
>


-- 
Bruno Cadonna
Software Engineer at Confluent


Re: [DISCUSS] KIP-201: Rationalising Policy interfaces

2019-04-09 Thread Tom Bentley
Hi Rajini,

I'd be happy to do that. I'll try to get it done in the next few days.

Although there's been quite a lot of interest this, the vote thread never
got any binding +1, so it's been stuck in limbo for a long time. It would
be great to get this moving again.

Kind regards,

Tom

On Tue, Apr 9, 2019 at 3:04 PM Rajini Sivaram 
wrote:

> Hi Tom,
>
> Are you planning to extend this KIP to also include dynamic broker config
> update (currently covered under AlterConfigPolicy)?
>
> May be worth sending another note to make progress on this KIP since it has
> been around a while and reading through the threads, it looks like there
> has been a lot of interest in it.
>
> Thank you,
>
> Rajini
>
>
> On Wed, Jan 9, 2019 at 11:25 AM Tom Bentley  wrote:
>
> > Hi Anna and Mickael,
> >
> > Anna, did you have any comments about the points I made?
> >
> > Mickael, we really need the vote to be passed before there's even any
> work
> > to do. With the exception of Ismael, the KIP didn't seem to get the
> > attention of any of the other committers.
> >
> > Kind regards,
> >
> > Tom
> >
> > On Thu, 13 Dec 2018 at 18:11, Tom Bentley  wrote:
> >
> > > Hi Anna,
> > >
> > > Firstly, let me apologise again about having missed your previous
> emails
> > > about this.
> > >
> > > Thank you for the feedback. You raise some valid points about
> ambiguity.
> > > The problem with pulling the metadata into CreateTopicRequest and
> > > AlterTopicRequest is that you lose the benefit of being able to eaily
> > write
> > > a common policy across creation and alter cases. For example, with the
> > > proposed design the policy maker could write code like this (forgive my
> > > pseudo-Java)
> > >
> > > public void validateCreateTopic(requestMetadata, ...) {
> > > commonPolicy(requestMetadata.requestedState());
> > >   }
> > >
> > >   public void validateAlterTopic(requestMetadata, ...) {
> > > commonPolicy(requestMetadata.requestedState());
> > >   }
> > >
> > >   private void commonPolicy(RequestedTopicState requestedState) {
> > > // ...
> > >   }
> > >
> > > I think that's an important feature of the API because (I think) very
> > > often the policy maker is interested in defining the universe of
> > prohibited
> > > configurations without really caring about whether the request is a
> > create
> > > or an alter. Having a single RequestedTopicState for both create and
> > > alter means they can do that trivially in one place. Having different
> > > methods in the two Request classes prevents this and forces the policy
> > > maker to pick apart the different requestState objects before calling
> any
> > > common method(s).
> > >
> > > I think my intention at the time (and it's many months ago now, so I
> > might
> > > not have remembered fully) was that RequestedTopicState would basically
> > > represent what the topic would look like after the requested changes
> were
> > > applied (I accept this isn't how it's Javadoc'd in the KIP), rather
> than
> > > representing the request itself. Thus if the request changed the
> > assignment
> > > of some of the partitions and the policy maker was interested in
> > precisely
> > > which partitions would be changed, and how, they would indeed have to
> > > compute that for themselves by looking up the current topic state from
> > the
> > > cluster state and seeing how they differed. Indeed they'd have to do
> this
> > > diff even to figure out that the user was requesting a change to the
> > topic
> > > assigned (or similarly for topic config, etc). To me this is acceptable
> > > because I think most people writing such policies are just interested
> in
> > > defining what is not allowed, so giving them a representation of the
> > > proposed topic state which they can readily check against is the most
> > > direct API. In this interpretation generatedReplicaAssignment() would
> > > just be some extra metadata annotating whether any difference between
> the
> > > current and proposed states was directly from the user, or generated on
> > the
> > > broker. You're right that it's ambiguous when the request didn't
> actually
> > > change the assignment but I didn't envisage policy makers using it
> except
> > > when the assignments differed anyway. To me it would be acceptable to
> > > Javadoc this.
> > >
> > > Given this interpretation of RequestedTopicState as "what the topic
> would
> > > look like after the requested changes were applied" can you see any
> other
> > > problems with the proposal? Or do you have use cases where the policy
> > maker
> > > is more interested in what the request is changing?
> > >
> > > Kind regards,
> > >
> > > Tom
> > >
> > > On Fri, 7 Dec 2018 at 08:41, Tom Bentley 
> wrote:
> > >
> > >> Hi Anna and Mickael,
> > >>
> > >> Sorry for remaining silent on this for so long. I should have time to
> > >> look at this again next week.
> > >>
> > >> Kind regards,
> > >>
> > >> Tom
> > >>
> > >> On Mon, 3 Dec 2018 at 10:11, Mickael Maison  >
> > >> wrote:

Re: [DISCUSS] KIP-435: Incremental Partition Reassignment

2019-04-09 Thread Jason Gustafson
Hi Colin,

On a related note, what do you think about the idea of storing the
> reassigning replicas in
> /brokers/topics/[topic]/partitions/[partitionId]/state, rather than in the
> reassignment znode?  I don't think this requires a major change to the
> proposal-- when the controller becomes aware that it should do a
> reassignment, the controller could make the changes.  This also helps keep
> the reassignment znode from getting larger, which has been a problem.


Yeah, I think it's a good idea to store the reassignment state at a finer
level. I'm not sure the LeaderAndIsr znode is the right one though. Another
option is /brokers/topics/{topic}. That is where we currently store the
replica assignment. I think we basically want to represent both the current
state and the desired state. This would also open the door to a cleaner way
to update a reassignment while it is still in progress.

-Jason




On Mon, Apr 8, 2019 at 11:14 PM George Li 
wrote:

>  Hi Colin / Jason,
>
> Reassignment should really be doing a batches.  I am not too worried about
> reassignment znode getting larger.  In a real production environment,  too
> many concurrent reassignment and too frequent submission of reassignments
> seemed to cause latency spikes of kafka cluster.  So
> batching/staggering/throttling of submitting reassignments is recommended.
>
> In KIP-236,  The "originalReplicas" are only kept for the current
> reassigning partitions (small #), and kept in memory of the controller
> context partitionsBeingReassigned as well as in the znode
> /admin/reassign_partitions,  I think below "setting in the RPC like null =
> no replicas are reassigning" is a good idea.
>
> There seems to be some issues with the Mail archive server of this mailing
> list?  I didn't receive email after April 7th, and the archive for April
> 2019 has only 50 messages (
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201904.mbox/thread) ?
>
> Thanks,
> George
>
>on, 08 Apr 2019 17:54:48 GMT  Colin McCabe wrote:
>
>   Yeah, I think adding this information to LeaderAndIsr makes sense.  It
> would be better to track
> "reassigningReplicas" than "originalReplicas", I think.  Tracking
> "originalReplicas" is going
> to involve sending a lot more data, since most replicas in the system are
> not reassigning
> at any given point.  Or we would need a hack in the RPC like null = no
> replicas are reassigning.
>
> On a related note, what do you think about the idea of storing the
> reassigning replicas in
>  /brokers/topics/[topic]/partitions/[partitionId]/state, rather than in
> the reassignment znode?
>  I don't think this requires a major change to the proposal-- when the
> controller becomes
> aware that it should do a reassignment, the controller could make the
> changes.  This also
> helps keep the reassignment znode from getting larger, which has been a
> problem.
>
> best,
> Colin
>
>
> On Mon, Apr 8, 2019, at 09:29, Jason Gustafson wrote:
> > Hey George,
> >
> > For the URP during a reassignment,  if the "original_replicas" is kept
> for
> > > the current pending reassignment. I think it will be very easy to
> compare
> > > that with the topic/partition's ISR.  If all "original_replicas" are in
> > > ISR, then URP should be 0 for that topic/partition.
> >
> >
> > Yeah, that makes sense. But I guess we would need "original_replicas" to
> be
> > propagated to partition leaders in the LeaderAndIsr request since leaders
> > are the ones that are computing URPs. That is basically what KIP-352 had
> > proposed, but we also need the changes to the reassignment path. Perhaps
> it
> > makes more sense to address this problem in KIP-236 since that is where
> you
> > have already introduced "original_replicas"? I'm also happy to do KIP-352
> > as a follow-up to KIP-236.
> >
> > Best,
> > Jason
> >
> >
> > On Sun, Apr 7, 2019 at 5:09 PM Ismael Juma  wrote:
> >
> > > Good discussion about where we should do batching. I think if there is
> a
> > > clear great way to batch, then it makes a lot of sense to just do it
> once.
> > > However, if we think there is scope for experimenting with different
> > > approaches, then an API that tools can use makes a lot of sense. They
> can
> > > experiment and innovate. Eventually, we can integrate something into
> Kafka
> > > if it makes sense.
> > >
> > > Ismael
> > >
> > > On Sun, Apr 7, 2019, 11:03 PM Colin McCabe  wrote:
> > >
> > > > Hi George,
> > > >
> > > > As Jason was saying, it seems like there are two directions we could
> go
> > > > here: an external system handling batching, and the controller
> handling
> > > > batching.  I think the controller handling batching would be better,
> > > since
> > > > the controller has more information about the state of the system.
> If
> > > the
> > > > controller handles batching, then the controller could also handle
> things
> > > > like setting up replication quotas for individual partitions.  The
> > > > controller could do things like throttle replication down 

Re: [VOTE] KIP-360: Improve handling of unknown producer when using EOS

2019-04-09 Thread Guozhang Wang
+1 (binding). Thanks for the written KIP! The approach lgtm.

One minor thing: the name of "last epoch" maybe a bit misleading (although
it is for internal usage only and will not be exposed to users) for future
developers, how about rename it to "required_epoch" and if it is set to
"-1" it means "not required, hence not checks"?

Guozhang

On Tue, Apr 9, 2019 at 5:02 PM Jason Gustafson  wrote:

> Bump (for Guozhang)
>
> On Mon, Apr 8, 2019 at 8:55 AM Jason Gustafson  wrote:
>
>> Hi All,
>>
>> I'd like to start a vote on KIP-360:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-360%3A+Improve+handling+of+unknown+producer
>> .
>>
>> +1 from me (duh)
>>
>> Thanks,
>> Jason
>>
>

-- 
-- Guozhang


Re: [VOTE] KIP-360: Improve handling of unknown producer when using EOS

2019-04-09 Thread Jason Gustafson
Bump (for Guozhang)

On Mon, Apr 8, 2019 at 8:55 AM Jason Gustafson  wrote:

> Hi All,
>
> I'd like to start a vote on KIP-360:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-360%3A+Improve+handling+of+unknown+producer
> .
>
> +1 from me (duh)
>
> Thanks,
> Jason
>


Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-09 Thread Guozhang Wang
Hi Sophie,

Thanks for the proposed KIP. I've made a pass over it and here are some
thoughts:

1. "The window size is effectively the grace and retention period". The
grace time is defined as "the time to admit late-arriving events after the
end of the window." hence it is the additional time beyond the window size.
I guess your were trying to say it should be zero?

Also for retention period, it is not a notion of the window spec any more,
but only for the window store itself. So I'd suggest talking about window
size here, and note that store retention time cannot be controlled via
window spec at all.

2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
bucket, so I'd assume you will expire one bucket as a whole when its end
time is smaller than the current window's starting time, right?

3. Also in your algorithm how to choose "M" seems tricky, would it be a
configurable parameter exposed to users or is it abstracted away and only
being selected internally?

4. "There is some tradeoff between purely optimizing " seems incomplete
paragraph?

5. Meta comment: for many aggregations it is commutative and associative so
we can require users to pass in a "substract" function as well. Given these
two function I think we can propose two set of APIs, 1) with the adder and
subtractor and 2) with the added only (if the aggregate logic is not comm.
and assoc.).

We just maintain an aggregate value for each bucket (call it
bucket_aggregate) plus for the whole window (call it total_aggregate), i.e.
at most M + 1 values per key. We use the total_aggregate for queries, and
each update will cause 2 writes (to the bucket and to the total aggregate).

And with 1) when expiring the oldest bucket we simply call
subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
oldest bucket we can re-compute the total_aggregate by
sum(bucket_aggregate) over other buckets again.

6. Meta comment: it is reasonable to assume in practice out-of-ordering
data is not very common, hence most of the updates will be falling into the
latest bucket. So I'm wondering if it makes sense to always store the first
bucket in memory while making other buckets optionally on persistent
storage. In practice, as long as M is large enough (we probably need it to
be large enough to have sufficiently sensitive expiration anyways) then
each bucket's aggregate data is small enough to be in memory.



Guozhang



On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman 
wrote:

> Hello all,
>
> I would like to kick off discussion of this KIP aimed at providing sliding
> window semantics to DSL aggregations.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>
> Please take a look and share any thoughts you have regarding the API,
> semantics, design, etc!
>
> I also have a POC PR open with the "naive" implementation for your
> reference: https://github.com/apache/kafka/pull/6549
>
> Cheers,
> Sophie
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-448: Add State Stores Unit Test Support to Kafka Streams Test Utils

2019-04-09 Thread Sophie Blee-Goldman
Hi Yishun, thanks for the KIP! I have a few initial questions/comments:

1) It may be useful to capture the iterator results as well (eg with a
MockIterator that wraps the underlying iterator and records the same way
the MockStore wraps/records the underlying store)

2) a. Where is the "persistent" variable coming from or being used? It
seems the MockKeyValueStore accepts it in the constructor, but only the
name parameter is passed when constructing a new MockKeyValueStore in
build() ... also, if we extend InMemoryXXXStore shouldn't this always be
false?
b. Is the idea to wrap an in-memory store for each type (key-value,
session, etc)? We don't (yet) offer an in-memory version of the session
store although it is in the works, so this will be possible -- I am more
wondering if it makes sense to decide this for the user or to allow them to
choose between in-memory or rocksDB by setting "persistent"

3) I'm wondering if users might want to be able to plug in their own custom
stores as the underlying backend...should we support this as well? WDYT?

4) We probably want to make these stores available through the public
test-utils package (maybe not the stores themselves which should be
internal, but should there be some kind of public API that gives access to
them?)

Cheers,
Sophie

On Tue, Apr 9, 2019 at 9:19 AM Yishun Guan  wrote:

> Bumping this up again, thanks!
>
> On Fri, Apr 5, 2019, 14:36 Yishun Guan  wrote:
>
> > Hi, bumping this up again. Thanks!
> >
> > On Tue, Apr 2, 2019, 13:07 Yishun Guan  wrote:
> >
> >> Hi All,
> >>
> >> I like to start a discussion on KIP-448
> >> (https://cwiki.apache.org/confluence/x/SAeZBg). It is about adding
> >> Mock state stores and relevant components for testing purposes.
> >>
> >> Here is the JIRA: https://issues.apache.org/jira/browse/KAFKA-6460
> >>
> >> This is a rough KIP draft, review and comment are appreciated. It
> >> seems to be tricky and some requirements and details are still needed
> >> to be discussed.
> >>
> >> Thanks,
> >> Yishun
> >>
> >
>


[jira] [Created] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted

2019-04-09 Thread alex gabriel (JIRA)
alex gabriel created KAFKA-8206:
---

 Summary: A consumer can't discover new group coordinator when the 
cluster was partly restarted
 Key: KAFKA-8206
 URL: https://issues.apache.org/jira/browse/KAFKA-8206
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.2.0, 2.0.0, 1.0.0
Reporter: alex gabriel


*A consumer can't discover new group coordinator when the cluster was partly 
restarted*

Preconditions:
I use Kafka server and Java kafka-client lib 2.2 version
I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 
ZK(localhost:2181/localhost:2181)
I have replication factor 2 for the all my topics and 
'_unclean.leader.election.enable=true_' on both Kafka nodes.

Steps to reproduce:

1) Start 2nodes (localhost:9092/localhost:9093)
2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
{noformat}
// discovered group coordinator (0-node)
2019-04-09 16:23:18,963 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>

...metadatacache is updated (2 nodes in the cluster list)
2019-04-09 16:23:18,928 DEBUG 
[org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
[Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending 
metadata request (type=MetadataRequest, topics=) to node localhost:9092 
(id: -1 rack: null)>
2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), 
localhost:9093 (id: 1 rack: null)], partitions = [], controller = 
localhost:9092 (id: 0 rack: null))}>
{noformat}
3) Shutdown 1-node (localhost:9093)
{noformat}
// metadata was updated to the 4 version (but for some reasons it still had 2 
alive nodes inside cluster)
2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), 
localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = 
events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], 
offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 
0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = 
localhost:9092 (id: 0 rack: null))}>

//consumers thinks that node-1 is still alive and try to send coordinator 
lookup to it but failed
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or 
invalid, will attempt rediscovery>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
2019-04-09 16:24:01,117 WARN 
[org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 
(localhost:9093) could not be established. Broker may not be available.>

// refreshing metadata again
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled 
request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, 
clientId=events-consumer0, correlationId=112) due to node 1 being disconnected>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Coordinator discovery failed, refreshing metadata>

// metadata was updated to the 5 version where cluster had only 0-node 
localhost:9092 as expected.
2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions 
= [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = 
[0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, 
partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = 
[1

[jira] [Created] (KAFKA-8205) Kafka SSL encryption of dataat rest

2019-04-09 Thread Niten Aggarwal (JIRA)
Niten Aggarwal created KAFKA-8205:
-

 Summary: Kafka SSL encryption of dataat rest
 Key: KAFKA-8205
 URL: https://issues.apache.org/jira/browse/KAFKA-8205
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 1.0.1
 Environment: All
Reporter: Niten Aggarwal


Recently we enabled SSL on our kafka cluster which earlier had SASL PLAINTEXT. 
Everything works fine from both producer and consumer standpoint as expected 
with one strange behavior. We noticed data in the log file is also encrypted 
which we didn't thought of because SSL is meant for transport level security 
not to encrypt data at rest.

It doesn't mean we have any issues with that but would like to understand what 
enables to perform encrypting data at rest. Do we have a way to:-

1) turn it off

2) Extend the encryption algorithm if company would like to use their own key 
management system and different algorithm.

After going through Kafka docs, we realized there is a KIP already in 
discussion but how come it's implemented without been approved?

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8204) Streams may flush state stores in the incorrect order

2019-04-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-8204:
---

 Summary: Streams may flush state stores in the incorrect order
 Key: KAFKA-8204
 URL: https://issues.apache.org/jira/browse/KAFKA-8204
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


Cached state stores may forward records during a flush call, so Streams should 
flush the stores in topological order. Otherwise, Streams may flush a 
downstream store before an upstream one, resulting in sink results being 
committed without the corresponding state changelog updates being committed.

This behavior is partly responsible for the bug reported in KAFKA-7895 .

The fix is simply to flush the stores in topological order, then when the 
upstream store forwards records to a downstream stateful processor, the 
corresponding state changes will be correctly flushed as well.

An alternative would be to repeatedly call flush on all state stores until they 
report there is nothing left to flush, but this requires a public API change to 
enable state stores to report whether they need a flush or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Dynamic window size for aggregations

2019-04-09 Thread Rajesh Kalyanasundaram
Thanks Boyang & Matthias for your replies.
Boyang you are right.. More generically, I want to create a calendar based 
window. If my window is in months then the window shall tumble or hop end of 
every month. If my window is in days, then the window shall tumble/advanceby 
end of every day.
Perhaps, the thread's subject is misleading in the sense that its not dynamic 
windows rather calendar based window. Obviously if my unit is days, then my 
window size is NOT dynamic.
Thanks
Regards,
Rajesh

On 09/04/19, 10:12 PM, "Boyang Chen"  wrote:

Hey Rajesh,


my understanding is that you want to create a month-based time window, is 
that correct?


Boyang


From: Matthias J. Sax 
Sent: Tuesday, April 9, 2019 11:42 PM
To: dev@kafka.apache.org
Subject: Re: Dynamic window size for aggregations

Feel free to create a feature request JIRA.

For now, you could use a custom processor (via `.transform()`) that uses
an attached window store to implement the logic you need.


-Matthias

On 4/9/19 4:09 AM, Rajesh Kalyanasundaram wrote:
> Hi all,
> We have a requirement to implement aggregations with TimedWindows which 
may have varying window sizes. For example, I may want a tumbling window that 
tumbles on 31st of Jan and 28th of Feb, 31st of March and so on.
> We did initial analysis of TimedWindows. Found that the windoSize is 
being fixed at many places in the streams code.
>
> Any guidance in supporting this would be very much appreciated.
> Thank you.
> Regards,
> Rajesh
> This email and any files transmitted with it are confidential, 
proprietary and intended solely for the individual or entity to whom they are 
addressed. If you have received this email in error please delete it 
immediately.
>



This email and any files transmitted with it are confidential, proprietary and 
intended solely for the individual or entity to whom they are addressed. If you 
have received this email in error please delete it immediately.


Re: Need Doc of Kafka with GO Language

2019-04-09 Thread Gwen Shapira
Confluent's Go client is here:
https://github.com/confluentinc/confluent-kafka-go
The repository has examples and the API documentation is here:
https://docs.confluent.io/current/clients/confluent-kafka-go/index.html

Sarama is another popular Go client: https://github.com/Shopify/sarama
And the documentation is here: https://godoc.org/github.com/Shopify/sarama



On Tue, Apr 9, 2019 at 9:23 AM vinodh kumar gulumuru <
vinodkumargulum...@gmail.com> wrote:

> Hi,
>  Right now I working on GO language. I need to integrate Kafka into my
> project for data streaming. I have searched for GO language with kafka. But
> I didn't get proper documentation. can you provide any tutorial or
> documentation? hope you are accept my request.
>
> Thank you.
>
> Regards,
> vinodh kumar
>


-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Re: Dynamic window size for aggregations

2019-04-09 Thread Boyang Chen
Hey Rajesh,


my understanding is that you want to create a month-based time window, is that 
correct?


Boyang


From: Matthias J. Sax 
Sent: Tuesday, April 9, 2019 11:42 PM
To: dev@kafka.apache.org
Subject: Re: Dynamic window size for aggregations

Feel free to create a feature request JIRA.

For now, you could use a custom processor (via `.transform()`) that uses
an attached window store to implement the logic you need.


-Matthias

On 4/9/19 4:09 AM, Rajesh Kalyanasundaram wrote:
> Hi all,
> We have a requirement to implement aggregations with TimedWindows which may 
> have varying window sizes. For example, I may want a tumbling window that 
> tumbles on 31st of Jan and 28th of Feb, 31st of March and so on.
> We did initial analysis of TimedWindows. Found that the windoSize is being 
> fixed at many places in the streams code.
>
> Any guidance in supporting this would be very much appreciated.
> Thank you.
> Regards,
> Rajesh
> This email and any files transmitted with it are confidential, proprietary 
> and intended solely for the individual or entity to whom they are addressed. 
> If you have received this email in error please delete it immediately.
>



Re: Need Doc of Kafka with GO Language

2019-04-09 Thread vinodh kumar gulumuru
Hi,
 Right now I working on GO language. I need to integrate Kafka into my
project for data streaming. I have searched for GO language with kafka. But
I didn't get proper documentation. can you provide any tutorial or
documentation? hope you are accept my request.

Thank you.

Regards,
vinodh kumar


Re: [DISCUSS] KIP-448: Add State Stores Unit Test Support to Kafka Streams Test Utils

2019-04-09 Thread Yishun Guan
Bumping this up again, thanks!

On Fri, Apr 5, 2019, 14:36 Yishun Guan  wrote:

> Hi, bumping this up again. Thanks!
>
> On Tue, Apr 2, 2019, 13:07 Yishun Guan  wrote:
>
>> Hi All,
>>
>> I like to start a discussion on KIP-448
>> (https://cwiki.apache.org/confluence/x/SAeZBg). It is about adding
>> Mock state stores and relevant components for testing purposes.
>>
>> Here is the JIRA: https://issues.apache.org/jira/browse/KAFKA-6460
>>
>> This is a rough KIP draft, review and comment are appreciated. It
>> seems to be tricky and some requirements and details are still needed
>> to be discussed.
>>
>> Thanks,
>> Yishun
>>
>


Re: [DISCUSS] KIP-360: Improve handling of unknown producer

2019-04-09 Thread Matthias J. Sax
@Adam:

As workaround, you can increase the repartition topic config
`segment.bytes` and set a larger segment size. This should mitigate the
issue.


-Matthias



On 4/4/19 3:47 PM, Jason Gustafson wrote:
> Hi Everyone,
> 
> Sorry for the long delay on this KIP. I have updated it to include the
> handling of INVALID_PRODUCER_ID_MAPPING as suggested above. If there are no
> further comments, I will plan to start a vote early next week.
> 
> Thanks!
> Jason
> 
> On Mon, Mar 25, 2019 at 2:33 PM Adam Bellemare 
> wrote:
> 
>> Ach - Sorry. I meant Jason. I had just read a John Roesler email.
>>
>> On Mon, Mar 25, 2019 at 5:21 PM Adam Bellemare 
>> wrote:
>>
>>> Hi John
>>>
>>> What is the status of this KIP?
>>>
>>> My teammates and I are running into the "UNKNOWN_PRODUCER_ID" error on
>>> 2.1.1 for a multitude of our internal topics, and I suspect that a proper
>>> fix is needed.
>>>
>>> Adam
>>>
>>> On Mon, Jan 7, 2019 at 7:42 PM Guozhang Wang  wrote:
>>>
 Thanks Jason. The proposed solution sounds good to me.


 Guozhang

 On Mon, Jan 7, 2019 at 3:52 PM Jason Gustafson 
 wrote:

> Hey Guozhang,
>
> Thanks for sharing the article. The INVALID_PRODUCER_ID_MAPPING error
> occurs following expiration of the producerId. It's possible that
 another
> producerId has been installed in its place following expiration (if
 another
> producer instance has become active), or the mapping is empty. We can
> safely retry the InitProducerId with the logic in this KIP in order to
> detect which case it is. So I'd suggest something like this:
>
> 1. After receiving INVALID_PRODUCER_ID_MAPPING, the producer can send
> InitProducerId using the current producerId and epoch.
> 2. If no mapping exists, the coordinator can generate a new producerId
 and
> return it. If a transaction is in progress on the client, it will have
 to
> be aborted, but the producer can continue afterwards.
> 3. Otherwise if a different producerId has been assigned, then we can
> return INVALID_PRODUCER_ID_MAPPING. To simplify error handling, we can
> probably raise this as ProducerFencedException since that is
>> effectively
> what has happened. Ideally this is the only fatal case that users have
 to
> handle.
>
> I'll give it a little more thought and update the KIP.
>
> Thanks,
> Jason
>
> On Thu, Jan 3, 2019 at 1:38 PM Guozhang Wang 
 wrote:
>
>> You're right about the dangling txn since it will actually block
>> read-committed consumers from proceeding at all. I'd agree that
>> since
> this
>> is a very rare case, we can consider fixing it not via broker-side
 logic
>> but via tooling in a future work.
>>
>> I've also discovered some related error handling logic inside
>> producer
> that
>> may be addressed together with this KIP (since it is mostly for
 internal
>> implementations the wiki itself does not need to be modified):
>>
>>
>>
>

>> https://stackoverflow.com/questions/53976117/why-did-the-kafka-stream-fail-to-produce-data-after-a-long-time/54029181#54029181
>>
>> Guozhang
>>
>>
>>
>> On Thu, Nov 29, 2018 at 2:25 PM Jason Gustafson >>
>> wrote:
>>
>>> Hey Guozhang,
>>>
>>> To clarify, the broker does not actually use the ApiVersion API
>> for
>>> inter-broker communications. The use of an API and its
>> corresponding
>>> version is controlled by `inter.broker.protocol.version`.
>>>
>>> Nevertheless, it sounds like we're on the same page about removing
>>> DescribeTransactionState. The impact of a dangling transaction is
>> a
>> little
>>> worse than what you describe though. Consumers with the
 read_committed
>>> isolation level will be stuck. Still, I think we agree that this
 case
>>> should be rare and we can reconsider for future work. Rather than
>>> preventing dangling transactions, perhaps we should consider
>> options
>> which
>>> allows us to detect them and recover. Anyway, this needs more
 thought.
> I
>>> will update the KIP.
>>>
>>> Best,
>>> Jason
>>>
>>> On Tue, Nov 27, 2018 at 6:51 PM Guozhang Wang >>
>> wrote:
>>>
 0. My original question is about the implementation details
> primarily,
 since current the handling logic of the APIVersionResponse is
 simply
>> "use
 the highest supported version of the corresponding request", but
 if
> the
 returned response from APIVersionRequest says "I don't even know
> about
>>> the
 DescribeTransactionStateRequest at all", then we need additional
> logic
>>> for
 the falling back logic. Currently this logic is embedded in
>> NetworkClient
 which is shared by all clients, so I'd like to avoid making this
> logic
>>> more
>

Re: Dynamic window size for aggregations

2019-04-09 Thread Matthias J. Sax
Feel free to create a feature request JIRA.

For now, you could use a custom processor (via `.transform()`) that uses
an attached window store to implement the logic you need.


-Matthias

On 4/9/19 4:09 AM, Rajesh Kalyanasundaram wrote:
> Hi all,
> We have a requirement to implement aggregations with TimedWindows which may 
> have varying window sizes. For example, I may want a tumbling window that 
> tumbles on 31st of Jan and 28th of Feb, 31st of March and so on.
> We did initial analysis of TimedWindows. Found that the windoSize is being 
> fixed at many places in the streams code.
> 
> Any guidance in supporting this would be very much appreciated.
> Thank you.
> Regards,
> Rajesh
> This email and any files transmitted with it are confidential, proprietary 
> and intended solely for the individual or entity to whom they are addressed. 
> If you have received this email in error please delete it immediately.
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-201: Rationalising Policy interfaces

2019-04-09 Thread Rajini Sivaram
Hi Tom,

Are you planning to extend this KIP to also include dynamic broker config
update (currently covered under AlterConfigPolicy)?

May be worth sending another note to make progress on this KIP since it has
been around a while and reading through the threads, it looks like there
has been a lot of interest in it.

Thank you,

Rajini


On Wed, Jan 9, 2019 at 11:25 AM Tom Bentley  wrote:

> Hi Anna and Mickael,
>
> Anna, did you have any comments about the points I made?
>
> Mickael, we really need the vote to be passed before there's even any work
> to do. With the exception of Ismael, the KIP didn't seem to get the
> attention of any of the other committers.
>
> Kind regards,
>
> Tom
>
> On Thu, 13 Dec 2018 at 18:11, Tom Bentley  wrote:
>
> > Hi Anna,
> >
> > Firstly, let me apologise again about having missed your previous emails
> > about this.
> >
> > Thank you for the feedback. You raise some valid points about ambiguity.
> > The problem with pulling the metadata into CreateTopicRequest and
> > AlterTopicRequest is that you lose the benefit of being able to eaily
> write
> > a common policy across creation and alter cases. For example, with the
> > proposed design the policy maker could write code like this (forgive my
> > pseudo-Java)
> >
> > public void validateCreateTopic(requestMetadata, ...) {
> > commonPolicy(requestMetadata.requestedState());
> >   }
> >
> >   public void validateAlterTopic(requestMetadata, ...) {
> > commonPolicy(requestMetadata.requestedState());
> >   }
> >
> >   private void commonPolicy(RequestedTopicState requestedState) {
> > // ...
> >   }
> >
> > I think that's an important feature of the API because (I think) very
> > often the policy maker is interested in defining the universe of
> prohibited
> > configurations without really caring about whether the request is a
> create
> > or an alter. Having a single RequestedTopicState for both create and
> > alter means they can do that trivially in one place. Having different
> > methods in the two Request classes prevents this and forces the policy
> > maker to pick apart the different requestState objects before calling any
> > common method(s).
> >
> > I think my intention at the time (and it's many months ago now, so I
> might
> > not have remembered fully) was that RequestedTopicState would basically
> > represent what the topic would look like after the requested changes were
> > applied (I accept this isn't how it's Javadoc'd in the KIP), rather than
> > representing the request itself. Thus if the request changed the
> assignment
> > of some of the partitions and the policy maker was interested in
> precisely
> > which partitions would be changed, and how, they would indeed have to
> > compute that for themselves by looking up the current topic state from
> the
> > cluster state and seeing how they differed. Indeed they'd have to do this
> > diff even to figure out that the user was requesting a change to the
> topic
> > assigned (or similarly for topic config, etc). To me this is acceptable
> > because I think most people writing such policies are just interested in
> > defining what is not allowed, so giving them a representation of the
> > proposed topic state which they can readily check against is the most
> > direct API. In this interpretation generatedReplicaAssignment() would
> > just be some extra metadata annotating whether any difference between the
> > current and proposed states was directly from the user, or generated on
> the
> > broker. You're right that it's ambiguous when the request didn't actually
> > change the assignment but I didn't envisage policy makers using it except
> > when the assignments differed anyway. To me it would be acceptable to
> > Javadoc this.
> >
> > Given this interpretation of RequestedTopicState as "what the topic would
> > look like after the requested changes were applied" can you see any other
> > problems with the proposal? Or do you have use cases where the policy
> maker
> > is more interested in what the request is changing?
> >
> > Kind regards,
> >
> > Tom
> >
> > On Fri, 7 Dec 2018 at 08:41, Tom Bentley  wrote:
> >
> >> Hi Anna and Mickael,
> >>
> >> Sorry for remaining silent on this for so long. I should have time to
> >> look at this again next week.
> >>
> >> Kind regards,
> >>
> >> Tom
> >>
> >> On Mon, 3 Dec 2018 at 10:11, Mickael Maison 
> >> wrote:
> >>
> >>> Hi Tom,
> >>>
> >>> This is a very interesting KIP. If you are not going to continue
> >>> working on it, would it be ok for us to grab it and complete it?
> >>> Thanks
> >>> On Thu, Jun 14, 2018 at 7:06 PM Anna Povzner 
> wrote:
> >>> >
> >>> > Hi Tom,
> >>> >
> >>> > Just wanted to check what you think about the comments I made in my
> >>> last
> >>> > message. I think this KIP is a big improvement to our current policy
> >>> > interfaces, and really hope we can get this KIP in.
> >>> >
> >>> > Thanks,
> >>> > Anna
> >>> >
> >>> > On Thu, May 31, 2018 at 3:29 PM, Anna Povzner 
> >

Re: [VOTE] KIP-445: In-memory session store

2019-04-09 Thread Bill Bejeck
Thanks for the KIP Sophie.

+1(binding)

-Bill

On Tue, Apr 9, 2019 at 12:14 AM Matthias J. Sax 
wrote:

> Thanks for the KIP Sophie!
>
> +1 (binding)
>
>
> -Matthias
>
> On 4/8/19 5:26 PM, Sophie Blee-Goldman wrote:
> > Hello all,
> >
> > There has been a positive reception so I'd like to call for a vote on
> > KIP-445, augmenting our session store options with an in-memory version.
> > This would round out our store API to offer in-memory and persistent
> > versions of all three types of stores.
> >
> > KIP:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-445%3A+In-memory+Session+Store
> > PR: https://github.com/apache/kafka/pull/6525
> > JIRA: https://issues.apache.org/jira/browse/KAFKA-8029
> >
> > This would also open up the possibility of migrating some of the
> > unit/integration tests to in-memory stores to speed things up a bit ;)
> >
> > Cheers,
> > Sophie
> >
>
>


Build failed in Jenkins: kafka-trunk-jdk11 #429

2019-04-09 Thread Apache Jenkins Server
See 


Changes:

[ismael] MINOR: Remove redundant access specifiers from metrics interfaces

--
[...truncated 2.36 MB...]
org.apache.kafka.connect.source.SourceRecordTest > 
shouldCreateSinkRecordWithHeaders PASSED

org.apache.kafka.connect.source.SourceRecordTest > 
shouldCreateSinkRecordWithEmtpyHeaders STARTED

org.apache.kafka.connect.source.SourceRecordTest > 
shouldCreateSinkRecordWithEmtpyHeaders PASSED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldDuplicateRecordAndCloneHeaders STARTED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldDuplicateRecordAndCloneHeaders PASSED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldCreateSinkRecordWithEmptyHeaders STARTED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldCreateSinkRecordWithEmptyHeaders PASSED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldDuplicateRecordUsingNewHeaders STARTED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldDuplicateRecordUsingNewHeaders PASSED

org.apache.kafka.connect.sink.SinkRecordTest > shouldModifyRecordHeader STARTED

org.apache.kafka.connect.sink.SinkRecordTest > shouldModifyRecordHeader PASSED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldCreateSinkRecordWithHeaders STARTED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldCreateSinkRecordWithHeaders PASSED

> Task :log4j-appender:test

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > 
testLog4jAppendsWithSyncSendWithoutIgnoringExceptionsShouldNotThrowException 
STARTED
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.easymock.cglib.core.ReflectUtils$1 
(file:/home/jenkins/.gradle/caches/modules-2/files-2.1/org.easymock/easymock/4.0.2/f74aebbe02f5051bea31c0dbc5df5202a59e0b78/easymock-4.0.2.jar)
 to method 
java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of 
org.easymock.cglib.core.ReflectUtils$1
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > 
testLog4jAppendsWithSyncSendWithoutIgnoringExceptionsShouldNotThrowException 
PASSED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > 
testLog4jAppendsWithRealProducerConfigWithSyncSendAndNotIgnoringExceptionsShouldThrowException
 STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > 
testLog4jAppendsWithRealProducerConfigWithSyncSendAndNotIgnoringExceptionsShouldThrowException
 PASSED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testLog4jAppends STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testLog4jAppends PASSED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testSetSaslMechanism 
STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testSetSaslMechanism 
PASSED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testJaasConfigNotSet 
STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testJaasConfigNotSet 
PASSED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testKafkaLog4jConfigs 
STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testKafkaLog4jConfigs 
PASSED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > 
testLog4jAppendsWithSyncSendAndSimulateProducerFailShouldThrowException STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > 
testLog4jAppendsWithSyncSendAndSimulateProducerFailShouldThrowException PASSED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > 
testLog4jAppendsWithRealProducerConfigWithSyncSendShouldNotThrowException 
STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > 
testLog4jAppendsWithRealProducerConfigWithSyncSendShouldNotThrowException PASSED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testSaslMechanismNotSet 
STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testSaslMechanismNotSet 
PASSED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testSetJaasConfig 
STARTED

org.apache.kafka.log4jappender.KafkaLog4jAppenderTest > testSetJaasConfig PASSED

> Task :jmh-benchmarks:test NO-SOURCE

> Task :streams:examples:test

org.apache.kafka.streams.examples.wordcount.WordCountProcessorTest > test 
STARTED

org.apache.kafka.streams.examples.wordcount.WordCountProcessorTest > test PASSED

> Task :streams:streams-scala:test

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > 

Jenkins build is back to normal : kafka-trunk-jdk8 #3526

2019-04-09 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-8203) plaintext connections to SSL secured broker can be handled more elegantly

2019-04-09 Thread Jorg Heymans (JIRA)
Jorg Heymans created KAFKA-8203:
---

 Summary: plaintext connections to SSL secured broker can be 
handled more elegantly
 Key: KAFKA-8203
 URL: https://issues.apache.org/jira/browse/KAFKA-8203
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.1.1
Reporter: Jorg Heymans


Mailing list thread: 
[https://lists.apache.org/thread.html/39935157351c0ad590e6cf02027816d664f1fd3724a25c1133a3bba6@%3Cusers.kafka.apache.org%3E]

-reproduced here

We have our brokers secured with these standard properties

 
{code:java}
listeners=SSL://a.b.c:9030 
ssl.truststore.location=... 
ssl.truststore.password=... 
ssl.keystore.location=... 
ssl.keystore.password=... 
ssl.key.password=... 
ssl.client.auth=required 
ssl.enabled.protocols=TLSv1.2 {code}
It's a bit surprising to see that when a (java) client attempts to connect 
without having SSL configured, so doing a PLAINTEXT connection instead, it does 
not get a TLS exception indicating that SSL is required. Somehow i would have 
expected a hard transport-level exception making it clear that non-SSL 
connections are not allowed, instead the client sees this (when debug logging 
is enabled)


{code:java}
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
21234bee31165527 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - 
[Consumer clientId=consumer-1, groupId=my-test-group] Kafka consumer 
initialized [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - 
[Consumer clientId=consumer-1, groupId=my-test-group] Subscribed to topic(s): 
events [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
clientId=consumer-1, groupId=my-test-group] Sending FindCoordinator request to 
broker a.b.c:9030 (id: -1 rack: null) [main] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=my-test-group] Initiating connection to node a.b.c:9030 (id: -1 rack: 
null) using address /a.b.c [main] DEBUG org.apache.kafka.common.metrics.Metrics 
- Added sensor with name node--1.bytes-sent [main] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--1.bytes-received [main] DEBUG org.apache.kafka.common.metrics.Metrics - 
Added sensor with name node--1.latency [main] DEBUG 
org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-1, 
groupId=my-test-group] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 
131072, SO_TIMEOUT = 0 to node -1 [main] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=my-test-group] Completed connection to node -1. Fetching API versions. 
[main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=consumer-1, groupId=my-test-group] Initiating API versions fetch from 
node -1. [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer 
clientId=consumer-1, groupId=my-test-group] Connection with /a.b.c disconnected 
java.io.EOFException at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
 at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) 
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) at 
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) 
at org.apache.kafka.common.network.Selector.poll(Selector.java:467) at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) 
at eu.europa.ec.han.TestConsumer.main(TestConsumer.java:22) [main] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=my-test-group] Node -1 disconnected. [main] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient - [Consumer 
clientId=consumer-1, groupId=my-test-group] Cancelled request with header 
RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-1, 
correlationId=0) due to node -1 being disconnected [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 

Dynamic window size for aggregations

2019-04-09 Thread Rajesh Kalyanasundaram
Hi all,
We have a requirement to implement aggregations with TimedWindows which may 
have varying window sizes. For example, I may want a tumbling window that 
tumbles on 31st of Jan and 28th of Feb, 31st of March and so on.
We did initial analysis of TimedWindows. Found that the windoSize is being 
fixed at many places in the streams code.

Any guidance in supporting this would be very much appreciated.
Thank you.
Regards,
Rajesh
This email and any files transmitted with it are confidential, proprietary and 
intended solely for the individual or entity to whom they are addressed. If you 
have received this email in error please delete it immediately.


Build failed in Jenkins: kafka-trunk-jdk11 #428

2019-04-09 Thread Apache Jenkins Server
See 


Changes:

[mjsax] MINOR: Correct KStream documentation (#6552)

--
[...truncated 2.36 MB...]

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldRoundTripNewNull STARTED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldRoundTripNewNull PASSED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldCloseSerde STARTED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldCloseSerde PASSED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldCloseDeserializer STARTED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldCloseDeserializer PASSED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldRoundTripOldNull STARTED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldRoundTripOldNull PASSED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldRoundTripNull STARTED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldRoundTripNull PASSED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldConfigureDeserializer STARTED

org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest > 
shouldConfigureDeserializer PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinMaterializedJoinedKTable STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinMaterializedJoinedKTable PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable PASSED

org.apache.kafka.streams.StreamsBuilderTest > shouldMergeStreams STARTED

org.apache.kafka.streams.StreamsBuilderTest > shouldMergeStreams PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinMaterializedFilteredKTable STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinMaterializedFilteredKTable PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldNotThrowNullPointerIfOptimizationsNotSpecified STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldNotThrowNullPointerIfOptimizationsNotSpecified PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldReuseSourceTopicAsChangelogsWithOptimization20 STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldReuseSourceTopicAsChangelogsWithOptimization20 PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinMaterializedMapValuedKTable STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinMaterializedMapValuedKTable PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinUnmaterializedMapValuedKTable STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinUnmaterializedMapValuedKTable PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinUnmaterializedFilteredKTable STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinUnmaterializedFilteredKTable PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldNotMaterializeStoresIfNotRequired STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldNotMaterializeStoresIfNotRequired PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldNotReuseSourceTopicAsChangelogsByDefault STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldNotReuseSourceTopicAsChangelogsByDefault PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldUseSerdesDefinedInMaterializedToConsumeTable STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldUseSerdesDefinedInMaterializedToConsumeTable PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldThrowExceptionWhenTopicNamesAreNull STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldThrowExceptionWhenTopicNamesAreNull PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinMaterializedSourceKTable STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinMaterializedSourceKTable PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldThrowExceptionWhenNoTopicPresent STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldThrowExceptionWhenNoTopicPresent PASSED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinUnmaterializedJoinedKTable STARTED

org.apache.kafka.streams.StreamsBuilderTest > 
shouldAllowJoinUnmaterializedJoinedKTable PASSED

org.apache.kafka.streams.StreamsBuilderTest > shouldProcessingFromSinkTopic 
STARTED

org.apache.kafka.streams.StreamsBuilderTest > shouldProcessingFromSinkTopic 
PASSED

org.apache.kafka.streams.StreamsBuilderTest > shouldProcessViaThroughTopic 
STARTED

org.apache.kafka.streams.StreamsBuilderTest > shouldProcessViaThroughTopic 
PASSED

org.apache.kafka.streams.KeyValueTest > shouldHaveSameEqualsAndHashCode STARTED

org.apache.kafka.strea

[jira] [Created] (KAFKA-8202) StackOverflowError on producer when splitting batches

2019-04-09 Thread Daniel Krawczyk (JIRA)
Daniel Krawczyk created KAFKA-8202:
--

 Summary: StackOverflowError on producer when splitting batches
 Key: KAFKA-8202
 URL: https://issues.apache.org/jira/browse/KAFKA-8202
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Daniel Krawczyk


Hello,

recently we came across a StackOverflowError error in the Kafka producer java 
library. The error caused the Kafka producer to stop.

The stack trace was as follows:
{code:java}
java.lang.StackOverflowError: null
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
// […]
{code}
The piece of code responsible for the error:
{code:java}
/**
 * This method is used when we have to split a large batch in smaller ones. A 
chained metadata will allow the
 * future that has already returned to the users to wait on the newly created 
split batches even after the
 * old big batch has been deemed as done.
 */
void chain(FutureRecordMetadata futureRecordMetadata) {
if (nextRecordMetadata == null)
nextRecordMetadata = futureRecordMetadata;
else
nextRecordMetadata.chain(futureRecordMetadata);
}
{code}
Before the error occurred we observed large amount of logs related to record 
batches being split (caused by MESSAGE_TOO_LARGE error) on one of our topics 
(logged by org.apache.kafka.clients.producer.internals.Sender):
{code:java}
[Producer clientId=producer-1] Got error produce response in correlation id 
158621342 on topic-partition , splitting and retrying (2147483647 
attempts left). Error: MESSAGE_TOO_LARGE
{code}
All logs had different correlation ids, but the same counters of attempts left 
(2147483647), so it looked like they were related to different requests and all 
of them were succeeding with no further retries.

We are using kafka-clients java library in version 2.0.0, the brokers are 2.1.1.

Thanks in advance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes

2019-04-09 Thread Anders Aagaard (JIRA)
Anders Aagaard created KAFKA-8201:
-

 Summary: Kafka streams repartitioning topic settings crashing 
multiple nodes
 Key: KAFKA-8201
 URL: https://issues.apache.org/jira/browse/KAFKA-8201
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Anders Aagaard


We had an incident in a setup using kafka streams version 2.0.0 and kafka 
version 2.0.0. The reason for it is a combination of kafka streams defaults and 
a bug in kafka.

Info about the setup: Streams application reading a log compacted input topic, 
and performing a groupby operation requiring repartitioning.

Kafka streams automatically creates a repartitioning topic with 24 partitions 
and the following options:

segment.bytes=52428800, retention.ms=9223372036854775807, 
segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.

 

This should mean we roll out a new segment when the active one reaches 50mb or 
is older than 10 mniutes. However, the different timestamps coming into the 
topic due to log compaction (sometimes varying in multiple days) means the 
server will see a message which is older than segments.ms and automatically 
trigger a new segment roll out. This causes a segment explosion. Where new 
segments are continuously rolled out.

There seems to be a bug report for this server side here : 
https://issues.apache.org/jira/browse/KAFKA-4336.

This effectively took down several nodes and a broker in our cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)