[jira] [Created] (KAFKA-9108) Flaky Test LogCleanerIntegrationTest#testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics

2019-10-28 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9108:
--

 Summary: Flaky Test 
LogCleanerIntegrationTest#testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics
 Key: KAFKA-9108
 URL: https://issues.apache.org/jira/browse/KAFKA-9108
 Project: Kafka
  Issue Type: Bug
  Components: core, unit tests
Reporter: Matthias J. Sax


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/26219/testReport/junit/kafka.log/LogCleanerIntegrationTest/testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics/]
{quote}org.scalatest.exceptions.TestFailedException: There should be 2 
uncleanable partitions at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)
 at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at 
org.scalatest.Assertions$class.fail(Assertions.scala:1091) at 
org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842) at 
kafka.log.LogCleanerIntegrationTest.testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics(LogCleanerIntegrationTest.scala:80){quote}
STDOUT
{quote}[2019-10-29 01:05:08,296] ERROR [kafka-log-cleaner-thread-0]: Error due 
to (kafka.log.LogCleaner:76) java.lang.InterruptedException at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at 
kafka.utils.ShutdownableThread.pause(ShutdownableThread.scala:82) at 
kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:315) at 
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] KIP-501 Avoid out-of-sync or offline partitions when follower fetch requests not processed in time

2019-10-28 Thread Satish Duggana
Hi All,
I wrote a short KIP about avoiding out-of-sync or offline partitions
when follower fetch requests are not processed in time by the leader
replica.
KIP-501 is located at https://s.apache.org/jhbpn

Please take a look, I would like to hear your feedback and suggestions.

JIRA: https://issues.apache.org/jira/browse/KAFKA-8733

Thanks,
Satish.


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-10-28 Thread Satish Duggana
Hi Viktor,
>1. Can we allow RLM Followers to serve read requests? After all segments on
the cold storage are closed ones, no modification is allowed. Besides
KIP-392 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica)
would introduce follower fetching too, so I think it would be nice to
prepare RLM for this as well.

That is a good point. We plan to support fetching remote storage from
followers too. Current code in the PR work fine for this scenario
though there may be some edge cases to be handled. We have not yet
tested this scenario.

>2. I think the remote.log.storage.enable config is redundant. By specifying
remote.log.storage.manager.class.name one already declares that they want
to use remote storage. Would it make sense to remove
the remote.log.storage.enable config?

I do not think it is really needed. `remote.log.storage.enable`
property can be removed.

Thanks,
Satish.


On Thu, Oct 24, 2019 at 2:46 PM Viktor Somogyi-Vass
 wrote:
>
> Hi Harsha,
>
> A couple more questions:
> 1. Can we allow RLM Followers to serve read requests? After all segments on
> the cold storage are closed ones, no modification is allowed. Besides
> KIP-392 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica)
> would introduce follower fetching too, so I think it would be nice to
> prepare RLM for this as well.
> 2. I think the remote.log.storage.enable config is redundant. By specifying
> remote.log.storage.manager.class.name one already declares that they want
> to use remote storage. Would it make sense to remove
> the remote.log.storage.enable config?
>
> Thanks,
> Viktor
>
>
> On Thu, Oct 24, 2019 at 10:37 AM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com> wrote:
>
> > Hi Jun & Harsha,
> >
> > I think it would be beneficial to at least provide one simple reference
> > implementation (file system based?) as we do with connect too.
> > That would as a simple example and would help plugin developers to better
> > understand the concept and the interfaces.
> >
> > Best,
> > Viktor
> >
> > On Wed, Oct 23, 2019 at 8:49 PM Jun Rao  wrote:
> >
> >> Hi, Harsha,
> >>
> >> Regarding feature branch, if the goal is faster collaboration, it seems
> >> that doing the development on your own fork is better since non-committers
> >> can push changes there.
> >>
> >> Regarding the dependencies, this is an important thing to clarify. My
> >> understanding for this KIP is that in Apache Kafka, we won't provide any
> >> specific implementation for a particular block storage. There are many
> >> block storage systems out there (HDFS, S3, Google storage, Azure storage,
> >> Ceph, etc). We don't want to drag in all those dependencies in Apache
> >> Kafka, even if they are in a separate module. Doing that will make the
> >> Kafka repo much harder to manage. We have used the same approach for
> >> connect. The connect framework is in Apache Kafka, but all specific
> >> connectors are hosted externally.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >>
> >> On Wed, Oct 23, 2019 at 8:41 AM Eno Thereska 
> >> wrote:
> >>
> >> > Thanks Satish, Harsha,
> >> >
> >> > It's probably worth it making it clearer in the KIP what exact
> >> > libraries will be added to libs, if any. The KIP specifies the remote
> >> > storage interface but it isn't clear if particular implementations
> >> > will be added to Kafka's repository or whether they will reside in
> >> > other repositories. If I understand the intention correctly, you are
> >> > proposing to have an HDFS and S3 implementation as part of the Kafka
> >> > repository working out of the box. Is that correct?
> >> >
> >> > Thanks
> >> > Eno
> >> >
> >> > On Wed, Oct 23, 2019 at 5:01 AM Satish Duggana <
> >> satish.dugg...@gmail.com>
> >> > wrote:
> >> > >
> >> > > >Regarding the HDFS dependency its not a direct dependency rather
> >> > > its implementing the RemoteStorageManager interface.
> >> > > We packaged it along with core to make it more convenient to test it.
> >> We
> >> > > can move this to external module and keep it there.
> >> > > Let me know what you think.
> >> > >
> >> > > Let me elaborate more on this point. With the new changes in the PR,
> >> > > kafka core or any other existing module is not dependent on HDFS. We
> >> > > created a new module called `remote-storage-managers/hdfs`. Libraries
> >> > > generated by this module are added to libs while packaging the
> >> > > distribution. This makes easy for users to try HDFS tiered storage
> >> > > instead of users building hdfs module and add it to libs on their own.
> >> > > We have plans to push these libs into external/libs/ directory and
> >> > > they will not be added to the classpath by default. We can add them to
> >> > > the classpath in scripts based on a system property.
> >> > >
> >> > > On Wed, Oct 23, 2019 at 6:26 AM Harsha Chintalapani 
> >> > wrote:
> >> > > >
> >> > > > Hi Jun,
> >> > > >

Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-28 Thread Guozhang Wang
Hi Walker,

This is a good point about compatibility breakage while overloading the
existing classes; while reading John and your exchanges, I think I still
need to clarify the motivations a bit more:

1) Multiple streams need to be aggregated together, inputs are always
*KStreams* and end result is a *KTable*.
2) Fields from different streams are never aggregated together, i.e. on the
higher level it is more like a "stitching up" the fields and then doing a
single aggregation.

In this context, I agree with you that it is still a streams-aggregation
operator that we are trying to optimize (though its a multi-way), not a
multi-way table-table-join operator that we are tying to optimize here.


-

But now taking a step back looking at it, I'm wondering, because of 2) that
all input streams do not have overlapping fields, we can generalize this to
a broader scope. Consider this case for example:

table1 = builder.table("topic1");
table2 = builder.table("topic2");
table3 = builder.table("topic3");
table4 = table1.join(table2).join(table3);

Suppose the join operations do not take out any fields or add any new
fields, i.e. say table1 has fields A, table2 has fields B, and table2 has
fields C besides the key K, the table 4 has field {A, B, C} --- the join is
just "stitching up" the fields --- then the above topology can actually be
optimized in a similar way:

* we only keep one materialized store in the form of K -> {A, B, C} as the
materialized store of the final join result of table4.
* when a record comes in from table1/2/3, just query the store on K, and
then update the corresponding A/B/C field and then writes back to the store.


Then the above streams-aggregation operator can be treated as a special
case of this: you first aggregate separately on stream1/2/3 and generate
table1/2/3, and then do this "stitching join", behind the scene we can
optimize the topology to do exactly the co-group logic by updating the
second bullet point above as an aggregation operator:

* when a record comes in from *stream1/2/3*, just query the store on K, and
then update the corresponding A/B/C field *with an aggregator *and then
writes back to the store.

-

Personally I think this is better because with 1) larger applicable scope,
and 2) without introducing new interfaces. But of course on the other side
it requires us to do this optimization inside the Streams with some syntax
hint from users (for example, users need to specify it is a "stitching
join" such that all fields are still preserved in the join result). WDYT?


Guozhang


On Mon, Oct 28, 2019 at 4:20 PM Walker Carlson 
wrote:

> Hi John,
>
> Thank you for the background information. I think I understand your point.
>
> I believe that this could be fixed by making the motivation a little
> clearer in the KIP.  I think that the motivation is when you have multiple
> streams that need to aggregate together to form a single object the
> current, non optimal, way to do this is through a multiway table join. This
> is a little hacky. There is a slight but significant difference in these
> cases, as in the null value handling you pointed out.
>
> For the example in the motivation, these tables were grouped streams so
> they already dropped the null values. If we consider Cogroup sitting in the
> same grey area that KGroupedStream does it should also behave this way. If
> you think about it that way it is more of an extension of KGroupedStream
> than KTable or KStream. Therefore I handle null values the same way
> KGroupedStream#aggregate does.
>
> Looking back I am not sure I understood you previous question fully at the
> time. I am sorry if my answer caused any confusion!
>
> Walker
>
> On Mon, Oct 28, 2019 at 2:49 PM John Roesler  wrote:
>
> > Hi Walker,
> >
> > Sorry for the delay in responding. Thanks for your response earlier.
> >
> > I think there might be a subtlety getting overlooked in considering
> > whether we're talking about streams versus tables in cogroup. As I'm
> > sure you know, Kafka Streams treats "stream" records as independent,
> > immutable, and opaque "facts", whereas we treat "table" records as a
> > sequence of updates to an entity identified by the record key (where
> > "update" means that each record's value represents the new state after
> > applying the update). For the most part, this is a clean separation,
> > but there is one special case where records with a "null" value are
> > interpreted as a tombstone in the table context (i.e., the record
> > indicates not that the new value of the entity is "null", but rather
> > that the entity has been deleted). In contrast, a record with a null
> > value in the stream context is _just_ a record with a null value; no
> > special semantics.
> >
> > The difficulty is that these two semantics clash at the stream/table
> > boundary. So, operations that convert streams to tables (like
> > KGroupedStream#aggregate) have to cope with ambiguity about whether to
> > 

Jenkins build is back to normal : kafka-trunk-jdk8 #4003

2019-10-28 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-28 Thread Walker Carlson
Hi John,

Thank you for the background information. I think I understand your point.

I believe that this could be fixed by making the motivation a little
clearer in the KIP.  I think that the motivation is when you have multiple
streams that need to aggregate together to form a single object the
current, non optimal, way to do this is through a multiway table join. This
is a little hacky. There is a slight but significant difference in these
cases, as in the null value handling you pointed out.

For the example in the motivation, these tables were grouped streams so
they already dropped the null values. If we consider Cogroup sitting in the
same grey area that KGroupedStream does it should also behave this way. If
you think about it that way it is more of an extension of KGroupedStream
than KTable or KStream. Therefore I handle null values the same way
KGroupedStream#aggregate does.

Looking back I am not sure I understood you previous question fully at the
time. I am sorry if my answer caused any confusion!

Walker

On Mon, Oct 28, 2019 at 2:49 PM John Roesler  wrote:

> Hi Walker,
>
> Sorry for the delay in responding. Thanks for your response earlier.
>
> I think there might be a subtlety getting overlooked in considering
> whether we're talking about streams versus tables in cogroup. As I'm
> sure you know, Kafka Streams treats "stream" records as independent,
> immutable, and opaque "facts", whereas we treat "table" records as a
> sequence of updates to an entity identified by the record key (where
> "update" means that each record's value represents the new state after
> applying the update). For the most part, this is a clean separation,
> but there is one special case where records with a "null" value are
> interpreted as a tombstone in the table context (i.e., the record
> indicates not that the new value of the entity is "null", but rather
> that the entity has been deleted). In contrast, a record with a null
> value in the stream context is _just_ a record with a null value; no
> special semantics.
>
> The difficulty is that these two semantics clash at the stream/table
> boundary. So, operations that convert streams to tables (like
> KGroupedStream#aggregate) have to cope with ambiguity about whether to
> treat null values opaquely as null values, or as tombstones. I think
> I'll make a long story short and just say that this is a very, very
> complex issue. As a result (and as a bit of a punt), our
> KGroupedStream operations actually just discard null-valued records.
> This means that the following are _not_ equivalent programs:
>
> table1 =
>   stream("records")
> .filter(Record::isOk)
> .groupByKey()
> .aggregate(() -> new Record(), (key, value, agg) -> value)
> table2 =
>   table("record")
> .filter(Record::isOk)
>
> They look about the same, in that they'll both produce a
> KTable with the value being the latest state. But if a
> record is deleted in the upstream data (represented as a "null"
> value), that record would also be deleted in table2, but not in
> table1. Table1 would just perpetually contain the value immediately
> prior to the delete.
>
> This is why it makes me nervous to propose a new kind of _stream_
> operation ostensibly in order to solve a problem that presents itself
> in the _table_ context.
>
> If the goal is to provide a more efficient and convenient multi-way
> KTable join, I think it would be a good idea to consider an extension
> to the KTable API, not the KStream API. On the other hand, if this is
> not the goal, then the motivation of the KIP shouldn't say that it is.
> Instead, the KIP could provide some other motivation specifically for
> augmenting the KStream API.
>
> There is a third alternative that comes to mind, if you wish to
> resolve the long-standing dilemma around this semantic problem and
> specify in the KIP how exactly nulls are handled in this operator. But
> (although this seems on the face to be a good option), I think it
> might be a briarpatch. Even if we are able to reach a suitable design,
> we'd have to contend with the fact that it looks like the
> KGroupedStream API, but behaves differently.
>
> What do you think about all this?
>
> Thanks again for the KIP and the discussion!
> -John
>
> On Mon, Oct 28, 2019 at 3:32 PM Walker Carlson 
> wrote:
> >
> > Hi Gouzhang,
> >
> > Matthias and I did talk about overloading different a type of aggregate
> > methods in the cogroup that would take in the windows and returns a
> > windowed KTable. We decided that it would break too much with the current
> > pattern that was established in the normal KStream. We can revisit this
> if
> > you have a different opinion on the tradeoff.
> >
> > Walker
> >
> > On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang 
> wrote:
> >
> > > Hi Walker,
> > >
> > > On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson 
> > > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > 1. I am familiar with the cogroup of spark, it is very similar to
> > > > their join 

Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-28 Thread John Roesler
Hi Walker,

Sorry for the delay in responding. Thanks for your response earlier.

I think there might be a subtlety getting overlooked in considering
whether we're talking about streams versus tables in cogroup. As I'm
sure you know, Kafka Streams treats "stream" records as independent,
immutable, and opaque "facts", whereas we treat "table" records as a
sequence of updates to an entity identified by the record key (where
"update" means that each record's value represents the new state after
applying the update). For the most part, this is a clean separation,
but there is one special case where records with a "null" value are
interpreted as a tombstone in the table context (i.e., the record
indicates not that the new value of the entity is "null", but rather
that the entity has been deleted). In contrast, a record with a null
value in the stream context is _just_ a record with a null value; no
special semantics.

The difficulty is that these two semantics clash at the stream/table
boundary. So, operations that convert streams to tables (like
KGroupedStream#aggregate) have to cope with ambiguity about whether to
treat null values opaquely as null values, or as tombstones. I think
I'll make a long story short and just say that this is a very, very
complex issue. As a result (and as a bit of a punt), our
KGroupedStream operations actually just discard null-valued records.
This means that the following are _not_ equivalent programs:

table1 =
  stream("records")
.filter(Record::isOk)
.groupByKey()
.aggregate(() -> new Record(), (key, value, agg) -> value)
table2 =
  table("record")
.filter(Record::isOk)

They look about the same, in that they'll both produce a
KTable with the value being the latest state. But if a
record is deleted in the upstream data (represented as a "null"
value), that record would also be deleted in table2, but not in
table1. Table1 would just perpetually contain the value immediately
prior to the delete.

This is why it makes me nervous to propose a new kind of _stream_
operation ostensibly in order to solve a problem that presents itself
in the _table_ context.

If the goal is to provide a more efficient and convenient multi-way
KTable join, I think it would be a good idea to consider an extension
to the KTable API, not the KStream API. On the other hand, if this is
not the goal, then the motivation of the KIP shouldn't say that it is.
Instead, the KIP could provide some other motivation specifically for
augmenting the KStream API.

There is a third alternative that comes to mind, if you wish to
resolve the long-standing dilemma around this semantic problem and
specify in the KIP how exactly nulls are handled in this operator. But
(although this seems on the face to be a good option), I think it
might be a briarpatch. Even if we are able to reach a suitable design,
we'd have to contend with the fact that it looks like the
KGroupedStream API, but behaves differently.

What do you think about all this?

Thanks again for the KIP and the discussion!
-John

On Mon, Oct 28, 2019 at 3:32 PM Walker Carlson  wrote:
>
> Hi Gouzhang,
>
> Matthias and I did talk about overloading different a type of aggregate
> methods in the cogroup that would take in the windows and returns a
> windowed KTable. We decided that it would break too much with the current
> pattern that was established in the normal KStream. We can revisit this if
> you have a different opinion on the tradeoff.
>
> Walker
>
> On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang  wrote:
>
> > Hi Walker,
> >
> > On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson 
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > 1. I am familiar with the cogroup of spark, it is very similar to
> > > their join operator but instead it makes the values iterable. I think
> > that
> > > the use cases are different enough that it makes sense to specify the
> > > aggregator when we do.
> > >
> > > I like the idea of "absorb" and I think it could be useful. Although I do
> > > not think it is as intuitive.
> > >
> > > If we were to go that route we would either use more processors or do
> > > essentially the same thing but would have to store the information
> > > required to cogroup inside that KTable. I think this would violate some
> > > design principles. I would argue that we should consider adding absorb as
> > > well and auto re-write it to use cogroup.
> > >
> >
> > Yeah I think I agree with you about the internal design complexity with
> > "absorb"; I was primarily thinking if we can save ourselves from
> > introducing 3 more public classes with co-group. But it seems that without
> > introducing new classes there's no easy way for us to bound the scope of
> > co-grouping (like how many streams will be co-grouped together).
> >
> > LMK if you have some better ideas: generally speaking the less new public
> > interfaces we are introducing to fulfill a new feature the better, so I'd
> > push us to think twice and carefully before we go down the 

[jira] [Resolved] (KAFKA-8700) Flaky Test QueryableStateIntegrationTest#queryOnRebalance

2019-10-28 Thread Chris Pettitt (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Pettitt resolved KAFKA-8700.
--
Resolution: Fixed

Checked in a fix for this which may reduce flakiness. Also improved 
instrumentation so that if this fails again we'll have more context to debug 
and repro.

> Flaky Test QueryableStateIntegrationTest#queryOnRebalance
> -
>
> Key: KAFKA-8700
> URL: https://issues.apache.org/jira/browse/KAFKA-8700
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0
>Reporter: Matthias J. Sax
>Assignee: Chris Pettitt
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3807/tests]
> {quote}java.lang.AssertionError: Condition not met within timeout 12. 
> waiting for metadata, store and value to be non null
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)
> at 
> org.apache.kafka.streams.integration.QueryableStateIntegrationTest.verifyAllKVKeys(QueryableStateIntegrationTest.java:292)
> at 
> org.apache.kafka.streams.integration.QueryableStateIntegrationTest.queryOnRebalance(QueryableStateIntegrationTest.java:382){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-441: Smooth Scaling Out for Kafka Streams

2019-10-28 Thread John Roesler
Hey all,

Now that the 2.4 release storm is over, I'd like to bump this vote thread.

Currently, we have two binding +1s (Guozhang and Bill), and four
non-binding ones (Bruno, Vinoth, Sophie, and myself), and no vetoes.

Thanks,
-John

On Thu, Sep 12, 2019 at 12:54 PM Bill Bejeck  wrote:
>
> +1 (binding)
>
> On Thu, Sep 12, 2019 at 1:53 PM Sophie Blee-Goldman 
> wrote:
>
> > +1 (non-binding)
> >
> > On Wed, Sep 11, 2019 at 11:38 AM Vinoth Chandar 
> > wrote:
> >
> > > +1 (non-binding).
> > >
> > > On Fri, Sep 6, 2019 at 12:46 AM Bruno Cadonna 
> > wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > > On Fri, Sep 6, 2019 at 12:32 AM Guozhang Wang 
> > > wrote:
> > > > >
> > > > > +1 (binding).
> > > > >
> > > > > On Thu, Sep 5, 2019 at 2:47 PM John Roesler 
> > wrote:
> > > > >
> > > > > > Hello, all,
> > > > > >
> > > > > > After a great discussion, I'd like to open voting on KIP-441,
> > > > > > to avoid long restore times in Streams after rebalancing.
> > > > > > Please cast your votes!
> > > > > >
> > > > > >
> > > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > >
> > >
> >


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-10-28 Thread Ryanne Dolan
Got it, thanks Ying.

Ryanne


On Mon, Oct 28, 2019, 2:42 PM Ying Zheng  wrote:

> >
> >
> >
> >
> - Why wait until local segments expire before offloading them to cold
> > storage? Why not stream to HDFS/S3 on an ongoing basis? I'd think this
> > would reduce bursty behavior from periodic uploads.
> >
> >
> I think you misunderstood the KIP. We do plan to ship the segment files to
> remote storage
> as soon as they are closed (no longer actively appended)
>
>
> > - Can we write to multiple remote stores at the same time? Can we have
> some
> > topics go to S3 and some to HDFS? Since we're storing "RDIs" that point
> to
> > remote locations, can we generalize this to full URIs that may be in any
> > supported remote store? In particular, what happens when you want to
> switch
> > from HDFS to S3 -- can we add a new plugin and keep going? Can we fetch
> > s3:/// URIs from S3 and hdfs:/// URIs from HDFS?
> >
> >
> This is definitely one goal when we design the KIP. The reason we call the
> field
> RDI (byte[]) rather than URI (string) is just to give the plugin developers
> more flexibility.
> As we don't require the field to follow any standard schema and don't try
> to interpret the
> field in Kafka core, developers can put whatever information that are
> needed by their
> remote storage into the field.
>
> For now, we just want to keep the first version simpler. We think most
> users will not use
> 2 different remote storages in one cluster. So, this is not a high priority
> feature. We can
> add the support of mix remote storage types in the next version.
>
>
> > - Instead of having brokers do all this, what if we just expose an API
> that
> > lets external tooling register a URI for a given segment? If I've copied
> a
> > segment file to S3, say with a daily cron job, why not just tell Kafka
> > where to find it? Assuming I've got a plugin to _read_ from S3, that's
> all
> > Kafka would need to know.
> >
> >
> I think you agree that we anyway need a plugin to read the remote storage.
> So, I think your question
> is should we let Kafka broker drive the remote storage write, or should we
> let the connector / external
> tool drive the remote storage write. I think it's slightly better to let
> Kafka broker drive this, because
> 1. Kafka broker has the latest topic information, such as if the broker is
> still the leader, if a segment
> file is already closed (high watermark has been moved to the next segment)
> 2. For the users, it's also easier to manage everything at one place: the
> Kafka topic configuration.
>
>
>
> > Ryanne
> >
> > On Thu, Oct 24, 2019, 9:13 AM Eno Thereska 
> wrote:
> >
> > > Going back to initial thread with general questions on KIP. I think
> > > aspects of the user experience still need clarification:
> > >
> > > - if a user has a mix of compacted and non-compacted topics it will be
> > > hard to reason about storage needs overall. Could you give a reason
> > > why compacted topics are not supported? This is probably because to do
> > > that you'd have to go with a paging approach (like Ryanne earlier
> > > suggested) and that will be expensive in terms of IO. Do you want to
> > > discount supporting compacted topics this early in the KIP design or
> > > do you want to leave open the option of supporting them eventually? In
> > > an ideal system, Kafka figures out if the topic is compacted or not
> > > and for non-compacted topics it doesn't do the local copy so it goes
> > > through a fast path.
> > >
> > > - why do we need per topic remote retention time and bytes? Why isn't
> > > per topic retention time and bytes (without the "remote" part)
> > > sufficient? E.g., if I have a topic and I want retention bytes to be
> > > 1TB, and I currently have 500GB local and 500GB remote, Kafka can
> > > manage what segments get deleted first. This would avoid the user
> > > needing to think even more about these extra configs.
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > > On Mon, Oct 21, 2019 at 4:46 PM Harsha  wrote:
> > > >
> > > > Hi All,
> > > >   Thanks for the initial feedback on the KIP-405.  We opened
> a
> > > PR here
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_kafka_pull_7561=DwIFaQ=r2dcLCtU9q6n0vrtnDw9vg=g7ujYPRBvNrON18SBeCt4g=2-rwlz_JuUwIr8hPRvJlA_52N5OMAoraCKZiFt-o7ck=9Es3dwy4hpDS6Bp22FdBjS6xa9LRHiOutx_AakhxVro=
> > .
> > > > Please take a look and let us know if you have any questions.
> > > > Since this feature is being developed by engineers from different
> > > companies we would like to open a feature branch in apache kafka git.
> It
> > > will allow us collaborate in open source community rather than in
> private
> > > branches. Please let me know if you have any objections to opening a
> > > feature branch in kafka's git repo.
> > > >
> > > > Thanks,
> > > > Harsha
> > > >
> > > > On Mon, Apr 8, 2019, at 10:04 PM, Harsha wrote:
> > > > > Thanks, Ron. Updating the KIP. will add answers here as well
> > > > >
> > > > >  1) If 

Re: [VOTE] KIP-523 Add KStream#toTable to the Streams DSL

2019-10-28 Thread John Roesler
Thanks, Aishwarya!

I'm +1 (non-binding)

-John

On Mon, Oct 28, 2019 at 11:58 AM aishwarya kumar  wrote:
>
> Thank you,
>
> Two binding votes so far.
>
> I'll keep this thread open for a couple of days.
>
> Best,
> Aishwarya
>
> On Thu, Oct 24, 2019, 3:05 PM Bill Bejeck  wrote:
>
> > Thanks for the KIP, this is something that will be appreciated by the
> > community.
> >
> > +1(binding)
> >
> > -Bill
> >
> > On Thu, Oct 24, 2019 at 12:54 PM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for the KIP!
> > >
> > > +1 (binding)
> > >
> > >
> > > -Matthias
> > >
> > > On 10/24/19 6:19 AM, aishwarya kumar wrote:
> > > > Hello All,
> > > >
> > > > After concluding discussions for this KIP, I would like to go forward
> > > with
> > > > the voting process.
> > > >
> > > > Jira Ticket: https://issues.apache.org/jira/browse/KAFKA-7658
> > > > KIP :
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> > > >
> > > > Thank you,
> > > > Aishwarya
> > > >
> > >
> > >
> >


Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-28 Thread Walker Carlson
Hi Gouzhang,

Matthias and I did talk about overloading different a type of aggregate
methods in the cogroup that would take in the windows and returns a
windowed KTable. We decided that it would break too much with the current
pattern that was established in the normal KStream. We can revisit this if
you have a different opinion on the tradeoff.

Walker

On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang  wrote:

> Hi Walker,
>
> On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson 
> wrote:
>
> > Hi Guozhang,
> >
> > 1. I am familiar with the cogroup of spark, it is very similar to
> > their join operator but instead it makes the values iterable. I think
> that
> > the use cases are different enough that it makes sense to specify the
> > aggregator when we do.
> >
> > I like the idea of "absorb" and I think it could be useful. Although I do
> > not think it is as intuitive.
> >
> > If we were to go that route we would either use more processors or do
> > essentially the same thing but would have to store the information
> > required to cogroup inside that KTable. I think this would violate some
> > design principles. I would argue that we should consider adding absorb as
> > well and auto re-write it to use cogroup.
> >
>
> Yeah I think I agree with you about the internal design complexity with
> "absorb"; I was primarily thinking if we can save ourselves from
> introducing 3 more public classes with co-group. But it seems that without
> introducing new classes there's no easy way for us to bound the scope of
> co-grouping (like how many streams will be co-grouped together).
>
> LMK if you have some better ideas: generally speaking the less new public
> interfaces we are introducing to fulfill a new feature the better, so I'd
> push us to think twice and carefully before we go down the route.
>
>
> >
> > 2. We have not considered this thought that would be a convenient
> > operation.
> >
> > 3. There is only one processor made. We are actually having the naming
> > conversation right now in the above thread
> >
> > 4, 5. fair points
> >
> > Walker
> >
> > On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang 
> wrote:
> >
> > > Hi Walker, thanks for the KIP! I made a pass on the writeup and have
> some
> > > comments below:
> > >
> > > Meta:
> > >
> > > 1. Syntax-wise, I'm wondering if we have compared our current proposal
> > with
> > > Spark's co-group syntax (I know they are targeting for different use
> > cases,
> > > but wondering if their syntax is closer to the join operator), what are
> > the
> > > syntax / semantics trade-off here?
> > >
> > > Just playing a devil's advocate here, if the main motivation is to
> > provide
> > > a more convienent multi-way join syntax, and in order to only have one
> > > materialized store we need to specify the final joined format at the
> > > beginning, then what about the following alternative (with the given
> > > example in your wiki page):
> > >
> > >
> > > KGroupedStream grouped1 = builder.stream("topic1").groupByKey();
> > > KGroupedStream grouped2 = builder.stream("topic2").groupByKey();
> > > KGroupedStream grouped3 = builder.stream("topic3").groupByKey();
> > >
> > > KTable aggregated = grouped1.aggregate(initializer,
> materialized,
> > > aggregator1);
> > >
> > > aggregated.absorb(grouped2, aggregator2);  // I'm just using a random
> > name
> > > on top of my head here
> > >   .absorb(grouped3, aggregator3);
> > >
> > > In this way, we just add a new API to the KTable to "absorb" new
> streams
> > as
> > > aggregated results without needing to introduce new first citizen
> > classes.
> > >
> > > 2. From the DSL optimization, have we considered if we can auto
> re-write
> > > the user written old fashioned multi-join into this new DSL operator?
> > >
> > > 3. Although it is not needed for the wiki page itself, for internal
> > > implementation how many processor nodes would we create for the new
> > > operator, and how we can allow users to name them?
> > >
> > > Minor:
> > >
> > > 4. In "Public Interfaces", better add the templated generics to
> > > "KGroupedStream" as "KGroupedStream".
> > >
> > > 5. Naming wise, I'd suggest we keep the "K" together with Stream/Table,
> > > e.g. "TimeWindowed*CogroupedKStream*".
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Walker,
> > > >
> > > > I am not sure if I can follow your argument. What do you exactly mean
> > by
> > > >
> > > > > I also
> > > > >> think that in this case it would be better to separate the 2
> option
> > > out
> > > > >> into separate overloads.
> > > >
> > > > Maybe you can give an example what method signature you have in mind?
> > > >
> > > > >> We could take a named parameter from upstream or add an extra
> naming
> > > > option
> > > > >> however I don't really see the advantage that would give.
> > > >
> > > > Are you familiar with KIP-307? Before KIP-307, KS generated all 

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-10-28 Thread Ying Zheng
On Thu, Oct 24, 2019 at 7:13 AM Eno Thereska  wrote:

> Going back to initial thread with general questions on KIP. I think
> aspects of the user experience still need clarification:
>
> - if a user has a mix of compacted and non-compacted topics it will be
> hard to reason about storage needs overall. Could you give a reason
> why compacted topics are not supported? This is probably because to do
> that you'd have to go with a paging approach (like Ryanne earlier
> suggested) and that will be expensive in terms of IO. Do you want to
> discount supporting compacted topics this early in the KIP design or
> do you want to leave open the option of supporting them eventually? In
> an ideal system, Kafka figures out if the topic is compacted or not
> and for non-compacted topics it doesn't do the local copy so it goes
> through a fast path.
>
>
Hi Eno,

I think the main purpose of tiered storage is to save local disk space.
Compact topics are
just in-memory hash tables. They are anyway very small. They can be totally
ignored when
you calculate local storage needs.

The segment files in remote storage are considered as read-only. They are
not going to be
changed until deleted after expire.

The segment files of a compact topic have to be compacted / rewritten
periodically. Doing
this on remote storage can may things much more complicated.


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-10-28 Thread Ying Zheng
>
>
>
>
- Why wait until local segments expire before offloading them to cold
> storage? Why not stream to HDFS/S3 on an ongoing basis? I'd think this
> would reduce bursty behavior from periodic uploads.
>
>
I think you misunderstood the KIP. We do plan to ship the segment files to
remote storage
as soon as they are closed (no longer actively appended)


> - Can we write to multiple remote stores at the same time? Can we have some
> topics go to S3 and some to HDFS? Since we're storing "RDIs" that point to
> remote locations, can we generalize this to full URIs that may be in any
> supported remote store? In particular, what happens when you want to switch
> from HDFS to S3 -- can we add a new plugin and keep going? Can we fetch
> s3:/// URIs from S3 and hdfs:/// URIs from HDFS?
>
>
This is definitely one goal when we design the KIP. The reason we call the
field
RDI (byte[]) rather than URI (string) is just to give the plugin developers
more flexibility.
As we don't require the field to follow any standard schema and don't try
to interpret the
field in Kafka core, developers can put whatever information that are
needed by their
remote storage into the field.

For now, we just want to keep the first version simpler. We think most
users will not use
2 different remote storages in one cluster. So, this is not a high priority
feature. We can
add the support of mix remote storage types in the next version.


> - Instead of having brokers do all this, what if we just expose an API that
> lets external tooling register a URI for a given segment? If I've copied a
> segment file to S3, say with a daily cron job, why not just tell Kafka
> where to find it? Assuming I've got a plugin to _read_ from S3, that's all
> Kafka would need to know.
>
>
I think you agree that we anyway need a plugin to read the remote storage.
So, I think your question
is should we let Kafka broker drive the remote storage write, or should we
let the connector / external
tool drive the remote storage write. I think it's slightly better to let
Kafka broker drive this, because
1. Kafka broker has the latest topic information, such as if the broker is
still the leader, if a segment
file is already closed (high watermark has been moved to the next segment)
2. For the users, it's also easier to manage everything at one place: the
Kafka topic configuration.



> Ryanne
>
> On Thu, Oct 24, 2019, 9:13 AM Eno Thereska  wrote:
>
> > Going back to initial thread with general questions on KIP. I think
> > aspects of the user experience still need clarification:
> >
> > - if a user has a mix of compacted and non-compacted topics it will be
> > hard to reason about storage needs overall. Could you give a reason
> > why compacted topics are not supported? This is probably because to do
> > that you'd have to go with a paging approach (like Ryanne earlier
> > suggested) and that will be expensive in terms of IO. Do you want to
> > discount supporting compacted topics this early in the KIP design or
> > do you want to leave open the option of supporting them eventually? In
> > an ideal system, Kafka figures out if the topic is compacted or not
> > and for non-compacted topics it doesn't do the local copy so it goes
> > through a fast path.
> >
> > - why do we need per topic remote retention time and bytes? Why isn't
> > per topic retention time and bytes (without the "remote" part)
> > sufficient? E.g., if I have a topic and I want retention bytes to be
> > 1TB, and I currently have 500GB local and 500GB remote, Kafka can
> > manage what segments get deleted first. This would avoid the user
> > needing to think even more about these extra configs.
> >
> > Thanks
> > Eno
> >
> >
> > On Mon, Oct 21, 2019 at 4:46 PM Harsha  wrote:
> > >
> > > Hi All,
> > >   Thanks for the initial feedback on the KIP-405.  We opened a
> > PR here
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_kafka_pull_7561=DwIFaQ=r2dcLCtU9q6n0vrtnDw9vg=g7ujYPRBvNrON18SBeCt4g=2-rwlz_JuUwIr8hPRvJlA_52N5OMAoraCKZiFt-o7ck=9Es3dwy4hpDS6Bp22FdBjS6xa9LRHiOutx_AakhxVro=
> .
> > > Please take a look and let us know if you have any questions.
> > > Since this feature is being developed by engineers from different
> > companies we would like to open a feature branch in apache kafka git. It
> > will allow us collaborate in open source community rather than in private
> > branches. Please let me know if you have any objections to opening a
> > feature branch in kafka's git repo.
> > >
> > > Thanks,
> > > Harsha
> > >
> > > On Mon, Apr 8, 2019, at 10:04 PM, Harsha wrote:
> > > > Thanks, Ron. Updating the KIP. will add answers here as well
> > > >
> > > >  1) If the cold storage technology can be cross-region, is there a
> > > >  possibility for a disaster recovery Kafka cluster to share the
> > messages in
> > > >  cold storage?  My guess is the answer is no, and messages replicated
> > to the
> > > >  D/R cluster have to be migrated to cold storage from there
> > 

Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-28 Thread Walker Carlson
Thank you Bill,

I can get behind keeping the correct pattern and adding the Named object
into the aggregate. And adding something like Cogrouped Object is a bit
beyond the scope of this KIP.

I updated the KIP to include the changes to include the Named parameter and
some minor text fixes. If there are no other comments I believe we can take
this to a vote?

Thanks,
Walker

On Mon, Oct 28, 2019 at 8:28 AM Bill Bejeck  wrote:

> Hi Walker,
>
> Thanks for taking on KIP-150, the co-group will be very useful.
>
> Regarding the naming, IMHO, we should stick to the current pattern, and
> that is, we provide overloads with a "Named" operator for the
> "aggregate" methods (I believe those are the only ones that create a
> processor).
> Currently, that's what we have with various operators performing
> aggregation operations.
> I understand your concern about adding methods, but IMHO it would be very
> confusing to users why would break the current pattern we have at an
> arbitrary point in time.
>
> As for Sophie's suggestion of adding a "CoGrouped" configuration object, I
> can see the merits of that approach.  But IMHO, instead of doing so for one
> operation, maybe we should take a step back and consider refactoring to one
> configuration object overall (I believe John has suggested something
> similar in the past).  That is well beyond the scope of this KIP, but I
> think it would be better to stick with our current pattern and consider
> changes we can apply to the entire API in a later KIP.
>
> Just my 2 cents.
>
> Thanks,
> Bill
>
>
> On Fri, Oct 25, 2019 at 4:34 PM Walker Carlson 
> wrote:
>
> > Hi Guozhang,
> >
> > 1. I am familiar with the cogroup of spark, it is very similar to
> > their join operator but instead it makes the values iterable. I think
> that
> > the use cases are different enough that it makes sense to specify the
> > aggregator when we do.
> >
> > I like the idea of "absorb" and I think it could be useful. Although I do
> > not think it is as intuitive.
> >
> > If we were to go that route we would either use more processors or do
> > essentially the same thing but would have to store the information
> > required to cogroup inside that KTable. I think this would violate some
> > design principles. I would argue that we should consider adding absorb as
> > well and auto re-write it to use cogroup.
> >
> > 2. We have not considered this thought that would be a convenient
> > operation.
> >
> > 3. There is only one processor made. We are actually having the naming
> > conversation right now in the above thread
> >
> > 4, 5. fair points
> >
> > Walker
> >
> > On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang 
> wrote:
> >
> > > Hi Walker, thanks for the KIP! I made a pass on the writeup and have
> some
> > > comments below:
> > >
> > > Meta:
> > >
> > > 1. Syntax-wise, I'm wondering if we have compared our current proposal
> > with
> > > Spark's co-group syntax (I know they are targeting for different use
> > cases,
> > > but wondering if their syntax is closer to the join operator), what are
> > the
> > > syntax / semantics trade-off here?
> > >
> > > Just playing a devil's advocate here, if the main motivation is to
> > provide
> > > a more convienent multi-way join syntax, and in order to only have one
> > > materialized store we need to specify the final joined format at the
> > > beginning, then what about the following alternative (with the given
> > > example in your wiki page):
> > >
> > >
> > > KGroupedStream grouped1 = builder.stream("topic1").groupByKey();
> > > KGroupedStream grouped2 = builder.stream("topic2").groupByKey();
> > > KGroupedStream grouped3 = builder.stream("topic3").groupByKey();
> > >
> > > KTable aggregated = grouped1.aggregate(initializer,
> materialized,
> > > aggregator1);
> > >
> > > aggregated.absorb(grouped2, aggregator2);  // I'm just using a random
> > name
> > > on top of my head here
> > >   .absorb(grouped3, aggregator3);
> > >
> > > In this way, we just add a new API to the KTable to "absorb" new
> streams
> > as
> > > aggregated results without needing to introduce new first citizen
> > classes.
> > >
> > > 2. From the DSL optimization, have we considered if we can auto
> re-write
> > > the user written old fashioned multi-join into this new DSL operator?
> > >
> > > 3. Although it is not needed for the wiki page itself, for internal
> > > implementation how many processor nodes would we create for the new
> > > operator, and how we can allow users to name them?
> > >
> > > Minor:
> > >
> > > 4. In "Public Interfaces", better add the templated generics to
> > > "KGroupedStream" as "KGroupedStream".
> > >
> > > 5. Naming wise, I'd suggest we keep the "K" together with Stream/Table,
> > > e.g. "TimeWindowed*CogroupedKStream*".
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Walker,
> > > >
> > > > I am not sure if I can follow 

Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-28 Thread Guozhang Wang
Hi Walker,

On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson 
wrote:

> Hi Guozhang,
>
> 1. I am familiar with the cogroup of spark, it is very similar to
> their join operator but instead it makes the values iterable. I think that
> the use cases are different enough that it makes sense to specify the
> aggregator when we do.
>
> I like the idea of "absorb" and I think it could be useful. Although I do
> not think it is as intuitive.
>
> If we were to go that route we would either use more processors or do
> essentially the same thing but would have to store the information
> required to cogroup inside that KTable. I think this would violate some
> design principles. I would argue that we should consider adding absorb as
> well and auto re-write it to use cogroup.
>

Yeah I think I agree with you about the internal design complexity with
"absorb"; I was primarily thinking if we can save ourselves from
introducing 3 more public classes with co-group. But it seems that without
introducing new classes there's no easy way for us to bound the scope of
co-grouping (like how many streams will be co-grouped together).

LMK if you have some better ideas: generally speaking the less new public
interfaces we are introducing to fulfill a new feature the better, so I'd
push us to think twice and carefully before we go down the route.


>
> 2. We have not considered this thought that would be a convenient
> operation.
>
> 3. There is only one processor made. We are actually having the naming
> conversation right now in the above thread
>
> 4, 5. fair points
>
> Walker
>
> On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang  wrote:
>
> > Hi Walker, thanks for the KIP! I made a pass on the writeup and have some
> > comments below:
> >
> > Meta:
> >
> > 1. Syntax-wise, I'm wondering if we have compared our current proposal
> with
> > Spark's co-group syntax (I know they are targeting for different use
> cases,
> > but wondering if their syntax is closer to the join operator), what are
> the
> > syntax / semantics trade-off here?
> >
> > Just playing a devil's advocate here, if the main motivation is to
> provide
> > a more convienent multi-way join syntax, and in order to only have one
> > materialized store we need to specify the final joined format at the
> > beginning, then what about the following alternative (with the given
> > example in your wiki page):
> >
> >
> > KGroupedStream grouped1 = builder.stream("topic1").groupByKey();
> > KGroupedStream grouped2 = builder.stream("topic2").groupByKey();
> > KGroupedStream grouped3 = builder.stream("topic3").groupByKey();
> >
> > KTable aggregated = grouped1.aggregate(initializer, materialized,
> > aggregator1);
> >
> > aggregated.absorb(grouped2, aggregator2);  // I'm just using a random
> name
> > on top of my head here
> >   .absorb(grouped3, aggregator3);
> >
> > In this way, we just add a new API to the KTable to "absorb" new streams
> as
> > aggregated results without needing to introduce new first citizen
> classes.
> >
> > 2. From the DSL optimization, have we considered if we can auto re-write
> > the user written old fashioned multi-join into this new DSL operator?
> >
> > 3. Although it is not needed for the wiki page itself, for internal
> > implementation how many processor nodes would we create for the new
> > operator, and how we can allow users to name them?
> >
> > Minor:
> >
> > 4. In "Public Interfaces", better add the templated generics to
> > "KGroupedStream" as "KGroupedStream".
> >
> > 5. Naming wise, I'd suggest we keep the "K" together with Stream/Table,
> > e.g. "TimeWindowed*CogroupedKStream*".
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax 
> > wrote:
> >
> > > Walker,
> > >
> > > I am not sure if I can follow your argument. What do you exactly mean
> by
> > >
> > > > I also
> > > >> think that in this case it would be better to separate the 2 option
> > out
> > > >> into separate overloads.
> > >
> > > Maybe you can give an example what method signature you have in mind?
> > >
> > > >> We could take a named parameter from upstream or add an extra naming
> > > option
> > > >> however I don't really see the advantage that would give.
> > >
> > > Are you familiar with KIP-307? Before KIP-307, KS generated all names
> > > for all Processors. This makes it hard to reason about a Topology if
> > > it's getting complex. Adding `Named` to the new co-group operator would
> > > actually align with KIP-307.
> > >
> > > > It seems to go in
> > > >> the opposite direction from the cogroup configuration idea you
> > proposed.
> > >
> > > Can you elaborate? Not sure if I can follow.
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 10/24/19 10:20 AM, Walker Carlson wrote:
> > > > While I like the idea Sophie I don't think that it is necessary. I
> also
> > > > think that in this case it would be better to separate the 2 option
> out
> > > > into separate overloads.
> > > > We could take a named 

Build failed in Jenkins: kafka-trunk-jdk11 #918

2019-10-28 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9048 Pt1: Remove Unnecessary lookup in Fetch Building (#7576)


--
[...truncated 2.72 MB...]

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.TestTopicsTest > testNonUsedOutputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonUsedOutputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testEmptyTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testEmptyTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testStartTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testStartTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testNegativeAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testNegativeAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
STARTED

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
PASSED

org.apache.kafka.streams.TestTopicsTest > testDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testValue STARTED

org.apache.kafka.streams.TestTopicsTest > testValue PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName PASSED

org.apache.kafka.streams.TestTopicsTest > testWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMapWithNull STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMapWithNull PASSED

org.apache.kafka.streams.TestTopicsTest > testNonExistingOutputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonExistingOutputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testMultipleTopics STARTED

org.apache.kafka.streams.TestTopicsTest > testMultipleTopics PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValueList STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValueList PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputWithNullDriver STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputWithNullDriver PASSED

org.apache.kafka.streams.TestTopicsTest > testValueList STARTED

org.apache.kafka.streams.TestTopicsTest > testValueList PASSED

org.apache.kafka.streams.TestTopicsTest > testRecordList STARTED

org.apache.kafka.streams.TestTopicsTest > testRecordList PASSED

org.apache.kafka.streams.TestTopicsTest > testNonExistingInputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonExistingInputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMap STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMap PASSED

org.apache.kafka.streams.TestTopicsTest > testRecordsToList STARTED

org.apache.kafka.streams.TestTopicsTest > testRecordsToList PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValueListDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValueListDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testInputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testInputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders STARTED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValue STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValue PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName PASSED

> Task :streams:upgrade-system-tests-0100:processTestResources NO-SOURCE
> Task 

[Jira contributor list] Adding request

2019-10-28 Thread Ahmed Oubalas
Hi all,

I'm new to apache kafka project and would love to contribute to it by
fixing bugs/documentation.

As per https://kafka.apache.org/contributing.html, could you  please add me
to the
contributor list so that I can start working on starter bugs.

JIRA id: Hmed06
Thank you,


[jira] [Resolved] (KAFKA-8800) Flaky Test SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-10-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8800.
--
Fix Version/s: (was: 2.5.0)
   Resolution: Duplicate

> Flaky Test 
> SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> --
>
> Key: KAFKA-8800
> URL: https://issues.apache.org/jira/browse/KAFKA-8800
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security, unit tests
>Affects Versions: 2.4.0
>Reporter: Matthias J. Sax
>Assignee: Anastasia Vela
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6956/testReport/junit/kafka.api/SaslScramSslEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/]
> {quote}org.scalatest.exceptions.TestFailedException: Consumed 0 records 
> before timeout instead of the expected 1 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822) at 
> kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:781) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1312) at 
> kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1320) at 
> kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:522)
>  at 
> kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:361){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8937) Flaky Test SaslPlainSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-10-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8937.
--
Resolution: Duplicate

> Flaky Test 
> SaslPlainSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> --
>
> Key: KAFKA-8937
> URL: https://issues.apache.org/jira/browse/KAFKA-8937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> h3. Error Message
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 1 records
> h3. Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 1 records at 
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)
>  at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions$class.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:830) at 
> kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:792) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1324) at 
> kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1333) at 
> kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:529)
>  at 
> kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:368)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:412) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at 

[jira] [Resolved] (KAFKA-9045) Flaky Test DelegationTokenEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-10-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9045.
--
Resolution: Duplicate

> Flaky Test 
> DelegationTokenEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> -
>
> Key: KAFKA-9045
> URL: https://issues.apache.org/jira/browse/KAFKA-9045
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/2645/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/]
> {quote}org.scalatest.exceptions.TestFailedException: Consumed 0 records 
> before timeout instead of the expected 1 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:841) at 
> kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1342) at 
> kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:529)
>  at 
> kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:368){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-523 Add KStream#toTable to the Streams DSL

2019-10-28 Thread aishwarya kumar
Thank you,

Two binding votes so far.

I'll keep this thread open for a couple of days.

Best,
Aishwarya

On Thu, Oct 24, 2019, 3:05 PM Bill Bejeck  wrote:

> Thanks for the KIP, this is something that will be appreciated by the
> community.
>
> +1(binding)
>
> -Bill
>
> On Thu, Oct 24, 2019 at 12:54 PM Matthias J. Sax 
> wrote:
>
> > Thanks for the KIP!
> >
> > +1 (binding)
> >
> >
> > -Matthias
> >
> > On 10/24/19 6:19 AM, aishwarya kumar wrote:
> > > Hello All,
> > >
> > > After concluding discussions for this KIP, I would like to go forward
> > with
> > > the voting process.
> > >
> > > Jira Ticket: https://issues.apache.org/jira/browse/KAFKA-7658
> > > KIP :
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> > >
> > > Thank you,
> > > Aishwarya
> > >
> >
> >
>


Re: [DISCUSS] KIP-543: Expand ConfigCommand's non-ZK functionality

2019-10-28 Thread Brian Byrne
Hi Stanislav,

That's fair, I wouldn't imagine it being necessary to add --describe for
all broker-loggers, but was intended mainly for consistency with the other
entity types. One might find it useful to grep for certain entries, or it's
possible that the output is improved to only show non-default values (like
the others), but I'm fine either way.

Thanks for the input.

Brian

On Sat, Oct 26, 2019 at 1:07 PM Stanislav Kozlovski 
wrote:

> Hey Brian,
>
> Thank you for the KIP, any step towards KIP-500 is a great idea!
>
> I wanted to discuss the need for a --describe on all the broker-loggers
> resource at once. The output is very verbose - I count more than 150 lines.
> Although since I imagine more options to describe internal state wouldn't
> hurt, I don't feel strongly about this.
>
> Thanks,
> Stanislav
>
> On Wed, Oct 23, 2019 at 12:20 AM Brian Byrne  wrote:
>
> > Sure - updated.
> >
> > Thanks,
> > Brian
> >
> > On Tue, Oct 22, 2019 at 4:02 PM Colin McCabe  wrote:
> >
> > > Hi Brian,
> > >
> > > Thanks for the KIP!  This looks like a nice cleanup.  It will be good
> to
> > > be able to perform these operations without direct ZK access.
> > >
> > > I guess this is a nitpick, but for the operations proposed for later,
> it
> > > might be nice to have a different color, like yellow, to indicate that
> > > we'll need a follow-on KIP to do them.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Tue, Oct 22, 2019, at 12:42, Brian Byrne wrote:
> > > > Hello all,
> > > >
> > > > I wrote a KIP about expanding the ConfigCommand's functionality when
> > not
> > > > accessing ZooKeeper directly, i.e. when --bootstrap-servers is
> > specified
> > > > instead of --zookeeper. This should bring the two into parity.
> > > >
> > > > There's also a bit of bikeshedding about additional flags for
> > shortening
> > > > entity specification on the command line. Please take a look:
> > > > https://cwiki.apache.org/confluence/x/ww-3Bw
> > > >
> > > > Thanks,
> > > > Brian
> > > >
> > >
> >
>
>
> --
> Best,
> Stanislav
>


Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-28 Thread Bill Bejeck
Hi Walker,

Thanks for taking on KIP-150, the co-group will be very useful.

Regarding the naming, IMHO, we should stick to the current pattern, and
that is, we provide overloads with a "Named" operator for the
"aggregate" methods (I believe those are the only ones that create a
processor).
Currently, that's what we have with various operators performing
aggregation operations.
I understand your concern about adding methods, but IMHO it would be very
confusing to users why would break the current pattern we have at an
arbitrary point in time.

As for Sophie's suggestion of adding a "CoGrouped" configuration object, I
can see the merits of that approach.  But IMHO, instead of doing so for one
operation, maybe we should take a step back and consider refactoring to one
configuration object overall (I believe John has suggested something
similar in the past).  That is well beyond the scope of this KIP, but I
think it would be better to stick with our current pattern and consider
changes we can apply to the entire API in a later KIP.

Just my 2 cents.

Thanks,
Bill


On Fri, Oct 25, 2019 at 4:34 PM Walker Carlson 
wrote:

> Hi Guozhang,
>
> 1. I am familiar with the cogroup of spark, it is very similar to
> their join operator but instead it makes the values iterable. I think that
> the use cases are different enough that it makes sense to specify the
> aggregator when we do.
>
> I like the idea of "absorb" and I think it could be useful. Although I do
> not think it is as intuitive.
>
> If we were to go that route we would either use more processors or do
> essentially the same thing but would have to store the information
> required to cogroup inside that KTable. I think this would violate some
> design principles. I would argue that we should consider adding absorb as
> well and auto re-write it to use cogroup.
>
> 2. We have not considered this thought that would be a convenient
> operation.
>
> 3. There is only one processor made. We are actually having the naming
> conversation right now in the above thread
>
> 4, 5. fair points
>
> Walker
>
> On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang  wrote:
>
> > Hi Walker, thanks for the KIP! I made a pass on the writeup and have some
> > comments below:
> >
> > Meta:
> >
> > 1. Syntax-wise, I'm wondering if we have compared our current proposal
> with
> > Spark's co-group syntax (I know they are targeting for different use
> cases,
> > but wondering if their syntax is closer to the join operator), what are
> the
> > syntax / semantics trade-off here?
> >
> > Just playing a devil's advocate here, if the main motivation is to
> provide
> > a more convienent multi-way join syntax, and in order to only have one
> > materialized store we need to specify the final joined format at the
> > beginning, then what about the following alternative (with the given
> > example in your wiki page):
> >
> >
> > KGroupedStream grouped1 = builder.stream("topic1").groupByKey();
> > KGroupedStream grouped2 = builder.stream("topic2").groupByKey();
> > KGroupedStream grouped3 = builder.stream("topic3").groupByKey();
> >
> > KTable aggregated = grouped1.aggregate(initializer, materialized,
> > aggregator1);
> >
> > aggregated.absorb(grouped2, aggregator2);  // I'm just using a random
> name
> > on top of my head here
> >   .absorb(grouped3, aggregator3);
> >
> > In this way, we just add a new API to the KTable to "absorb" new streams
> as
> > aggregated results without needing to introduce new first citizen
> classes.
> >
> > 2. From the DSL optimization, have we considered if we can auto re-write
> > the user written old fashioned multi-join into this new DSL operator?
> >
> > 3. Although it is not needed for the wiki page itself, for internal
> > implementation how many processor nodes would we create for the new
> > operator, and how we can allow users to name them?
> >
> > Minor:
> >
> > 4. In "Public Interfaces", better add the templated generics to
> > "KGroupedStream" as "KGroupedStream".
> >
> > 5. Naming wise, I'd suggest we keep the "K" together with Stream/Table,
> > e.g. "TimeWindowed*CogroupedKStream*".
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax 
> > wrote:
> >
> > > Walker,
> > >
> > > I am not sure if I can follow your argument. What do you exactly mean
> by
> > >
> > > > I also
> > > >> think that in this case it would be better to separate the 2 option
> > out
> > > >> into separate overloads.
> > >
> > > Maybe you can give an example what method signature you have in mind?
> > >
> > > >> We could take a named parameter from upstream or add an extra naming
> > > option
> > > >> however I don't really see the advantage that would give.
> > >
> > > Are you familiar with KIP-307? Before KIP-307, KS generated all names
> > > for all Processors. This makes it hard to reason about a Topology if
> > > it's getting complex. Adding `Named` to the new co-group operator would
> > > actually align with KIP-307.
> > >
> 

Re: [DISCUSS] KIP-544: Make metrics exposed via JMX configurable

2019-10-28 Thread Viktor Somogyi-Vass
Hi Xavier,

How would the practical application look like if this was implemented?
Would monitoring agents switch between the whitelist and blacklist
periodically if they wanted to monitor every metrics?
I think we should make some usage recommendations.

Thanks,
Viktor

On Sun, Oct 27, 2019 at 3:34 PM Gwen Shapira  wrote:

> Thanks Xavier.
>
> I really like this proposal. Collecting JMX metrics in clusters with
> 100K partitions was nearly impossible due to the design of JMX and the
> single lock mechanism. Yammer's limitations meant that any metric we
> reported was exposed via JMX, so we couldn't have cheaper reporters
> export one set of metrics, and JMX export another.
>
> Your proposal looks like a great way to lift this limitation and give
> us more flexibility in reporting metrics.
>
> Gwen
>
> On Fri, Oct 25, 2019 at 5:17 PM Xavier Léauté  wrote:
> >
> > Hi All,
> >
> > I wrote a short KIP to make the set of metrics exposed via JMX
> configurable.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable
> >
> > Let me know what you think.
> >
> > Thanks,
> > Xavier
>


Re: [DISCUSSION] KIP-422: Add support for user/client configuration in the Kafka Admin Client

2019-10-28 Thread Viktor Somogyi-Vass
Hi Yaodong,

Is this KIP still active?
In case you have no time for it, I'd like to continue this.

Thanks,
Viktor

On Thu, May 23, 2019 at 6:37 PM Yaodong Yang 
wrote:

> friendly ping...
>
> On Wed, May 15, 2019 at 10:57 AM Yaodong Yang 
> wrote:
>
> > Hello Colin, Rajini and Jun,
> >
> > I have updated the KIP 422, please take another look!
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97555704
> >
> > Thanks!
> > Yaodong
> >
> > On Tue, May 7, 2019 at 11:54 AM Yaodong Yang 
> > wrote:
> >
> >> Hello Colin, Rajini and Jun,
> >>
> >> I think it makes sense to have new APIs defined in the AdminClient for
> >> quota management only, as Colin described above. For the request and
> >> response protocol, It seems like we can leverage the existing requests:
> >> *IncrementalAlterConfigsRequest* and *DescribeConfigsRequest*. In this
> >> way, we convert the quota management requests (set quota, describe quota
> >> and delete quotas) to configuration changes for User resource and Client
> >> resource (e.g. update a configuration to user 1 ). And then we send the
> >> configuration change request to the broker side. Therefore, we will not
> >> have any protocol changes for this KIP.
> >>
> >> Please let me know what you think.
> >>
> >> Thanks!
> >> Yaodong
> >>
> >>
> >> On Mon, Apr 15, 2019 at 12:16 PM Colin McCabe 
> wrote:
> >>
> >>> Hi all,
> >>>
> >>> In KIP-133: Describe and Alter Configs Admin APIs, there is "future
> >>> work" section that explains:
> >>>
> >>> > Future Work
> >>> > ...
> >>> >
> >>>  > 2. Support for reading and updating client, user and replication
> >>> quotas. We
> >>>  > initially included that in the KIP, but it subsequently became
> >>> apparent
> >>>  > that a separate protocol and AdminClient API would be more
> >>> appropriate.
> >>>  > The reason is that client/user quotas can be applied on a client id,
> >>> user
> >>>  > or (client id, user) tuple. In the future, the hierarchy may get
> even
> >>> more
> >>>  > complicated. So, it makes sense to keeping the API simple for the
> >>> simple
> >>>  > cases while introducing a more sophisticated API for the more
> complex
> >>> case.
> >>>
> >>> In other words, we deliberately didn't implement quotas through
> >>> AlterConfigs because we felt like it the AlterConfigs API wasn't really
> >>> complex enough to handle quotas well.
> >>>
> >>> I think that the original discussion was correct here -- we should have
> >>> a special API for quotas, rather than trying to shoehorn them into the
> >>> AlterConfigs API.
> >>>
> >>> For example, we could have an API like this:
> >>>
> >>> >
> >>> > SetQuotasResults setQuotas(Map quotas,
> >>> SetQuotasOptions options)
> >>> >
> >>> > interface QuotaTarget {
> >>> > }
> >>> >
> >>> > class ClientQuotaTarget implements QuotaTarget {
> >>> >   String clientId;
> >>> > }
> >>> >
> >>> > class PrincipalQuotaTarget implements QuotaTarget {
> >>> >   String principal;
> >>> > }
> >>> >
> >>> > class ClientAndPrincipalQuotaTarget implements QuotaTarget {
> >>> >   String clientId;
> >>> >   String principal;
> >>> > }
> >>> >
> >>> > class QuotaLimit {
> >>> >long bytesWrittenPerSec;
> >>> >long bytesReadPerSec;
> >>> > }
> >>> >
> >>> > DescribeQuotasResults describeQuotas(QuotaTarget target,
> >>> DescribeQuotasOptions options);
> >>> >
> >>> > ListQuotasResults listQuotas(ListQuotasOptions options);
> >>> >
> >>>
> >>> This would avoid the need to parse text strings.  It would also make it
> >>> really clear how to use and extend the API.
> >>>
> >>> best,
> >>> Colin
> >>>
> >>> On Mon, Apr 8, 2019, at 05:29, Rajini Sivaram wrote:
> >>> > Hi Jun, Yaodong,
> >>> >
> >>> > 21. The proposed approach sounds very hacky. User principals can
> >>> contain
> >>> > arbitrary characters. So we can't simply split
> `user1/clients/clientA`
> >>> into
> >>> > tokens using '/' as delimiter.  Internally, we sanitize names before
> >>> > storing in ZK, but the names provided by the user are actual
> >>> principals and
> >>> > client-ids. I think we want to have entity names that explicitly
> >>> specify
> >>> > (type, name) as in the CLI kafka-configs.sh and allow multiple
> >>> entities to
> >>> > be specified together for (user, client-id). That will also enable us
> >>> to
> >>> > configure defaults in a consistent way.
> >>> >
> >>> > 22. Yes, that sounds reasonable.
> >>> >
> >>> > On Fri, Apr 5, 2019 at 11:13 PM Jun Rao  wrote:
> >>> >
> >>> > > Hi, Yaodong,
> >>> > >
> >>> > > Yes, what you proposed makes sense. A couple of more comments.
> >>> > >
> >>> > > 21.  Could you add those examples to the KIP wiki? It would also be
> >>> useful
> >>> > > to know how to set the ConfigEntry value for quotas at
> >>> > > DefaultClientInUser, DefaultClientDefaultUser and DefaultUser
> level.
> >>> > >
> >>> > > 22. To support KIP-257, I guess we can just pass in some special
> >>> string
> >>> > > value in ConfigEntry value through alterConfig and let the
> customized
> >>> >