Re: add as contributor

2019-03-30 Thread Vahid Hashemian
Hi Brad,

I just added you to the list of contributors.

Thanks!
--Vahid

On Sun, Mar 31, 2019, 00:49 Brad Ellis  wrote:

> Hey,
> Just checking in here. I've got some time this weekend and would love to
> start contributing.  But, I don't want to go too far before assigning a
> Jira to myself and end up duplicating work.
>
> Can you add me as a contributor so that I can assign a Jira to myself (or
> assign this one to me: KAFKA-8157
> )?
>
> Name: Travis Brad Ellis
> username: Brad Ellis
> https://issues.apache.org/jira/secure/ViewProfile.jspa
> https://github.com/tbradellis
>
>
> On Thu, Mar 28, 2019 at 4:41 PM Brad Ellis  wrote:
>
> > Hi,
> >
> > I'd like to contribute to the kafka project.  Can you add me as a
> > contributor?
> > In particular, I'm planning on picking up this JIRA as a first issue:
> >
> > https://issues.apache.org/jira/browse/KAFKA-8157
> >
> > Best regards,
> >
> > Brad Ellis
> > https://github.com/tbradellis
> >
>


Re: add as contributor

2019-03-30 Thread Brad Ellis
Hey,
Just checking in here. I've got some time this weekend and would love to
start contributing.  But, I don't want to go too far before assigning a
Jira to myself and end up duplicating work.

Can you add me as a contributor so that I can assign a Jira to myself (or
assign this one to me: KAFKA-8157
)?

Name: Travis Brad Ellis
username: Brad Ellis
https://issues.apache.org/jira/secure/ViewProfile.jspa
https://github.com/tbradellis


On Thu, Mar 28, 2019 at 4:41 PM Brad Ellis  wrote:

> Hi,
>
> I'd like to contribute to the kafka project.  Can you add me as a
> contributor?
> In particular, I'm planning on picking up this JIRA as a first issue:
>
> https://issues.apache.org/jira/browse/KAFKA-8157
>
> Best regards,
>
> Brad Ellis
> https://github.com/tbradellis
>


[jira] [Created] (KAFKA-8177) Allow for separate connect instances to have sink connectors with the same name

2019-03-30 Thread Paul Whalen (JIRA)
Paul Whalen created KAFKA-8177:
--

 Summary: Allow for separate connect instances to have sink 
connectors with the same name
 Key: KAFKA-8177
 URL: https://issues.apache.org/jira/browse/KAFKA-8177
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Paul Whalen


If you have multiple Connect instances (either a single standalone or 
distributed group of workers) running against the same Kafka cluster, the 
connect instances cannot each have a sink connector with the same name and 
still operate independently. This is because the consumer group ID used 
internally for reading from the source topic(s) is entirely derived from the 
connector's name: 
[https://github.com/apache/kafka/blob/d0e436c471ba4122ddcc0f7a1624546f97c4a517/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java#L24]

The documentation of Connect implies to me that it supports "multi-tenancy," 
that is, as long as...
 * In standalone mode, the {{offset.storage.file.filename}} is not shared 
between instances
 * In distributed mode, {{group.id}} and {{config.storage.topic}}, 
{{offset.storage.topic}}, and {{status.storage.topic}} are not the same between 
instances

... then the connect instances can operate completely independently without 
fear of conflict.  But the sink connector consumer group naming policy makes 
this untrue. Obviously this can be achieved by uniquely naming connectors 
across instances, but in some environments that could be a bit of a nuisance, 
or a challenging policy to enforce. For instance, imagine a large group of 
developers or data analysts all running their own standalone Connect to load 
into a SQL database for their own analysis, or replicating to mirroring to 
their own local cluster for testing.

The obvious solution is allow supplying config that gives a Connect instance 
some notion of identity, and to use that when creating the sink task consumer 
group. Distributed mode already has this obviously ({{group.id}}), but it would 
need to be added for standalone mode. Maybe {{instance.id}}? Given that 
solution it seems like this would need a small KIP.

I could also imagine this solving this problem through better documentation 
("ensure your connector names are unique!"), but having that subtlety doesn't 
seem worth it to me. (Optionally) assigning identity to every Connect instance 
seems strictly more clear, without any downside.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8176) Revisit BaseConsumerTest inheritance

2019-03-30 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-8176:
--

 Summary: Revisit BaseConsumerTest inheritance
 Key: KAFKA-8176
 URL: https://issues.apache.org/jira/browse/KAFKA-8176
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski


As discussed here 
([https://github.com/apache/kafka/pull/6238#discussion_r256542009),] we have 
redudant test cases being run, where `ConsumerBounceTest` inherits 
`BaseConsumerTest`

We should revisit this, potentially via creating an intermediate class that is 
inherited or splitting the utility logic into a separate utillity class



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-03-30 Thread Ivan Ponomarev

Hi all!

I was about to write another KIP, but found out that KIP-401 addresses 
exactly the problem I faced. So let me jump into your discussion and ask 
you to assess another idea.


I fully agree with the KIP-401's motivation part. E. g in my project I 
had to invent a wrapper class that hides the details of KeyValueStore 
management from business logic. Of course this should be done better in 
KStreams API.


But I was about to look at this problem from another side and propose a 
simple alternative in high-level DSL, that will not fit all the cases, 
but  most of them. Hence my idea does not exclude the Paul's proposal.


What if we restrict ourselves to *only one* KeyValueStore and propose a 
method that resembles `aggregate` and `reduce` methods, like this:


stream
   .map(...)
   .filter(...)
   .transform ((k, v, s)->{}, Transformed.with())

where
* k, v -- input key & value
* s -- a KeyValueStore provided as an argument
* return value of the lambda should be KeyValue.pair(...)
* Transformed.with... is a builder, used in order to define the 
Transformer and KeyValueStore building parameters. Some of these 
parameters should be:

** store's KeySerde,
** store's ValueSerde,
** whether the store is persistent or in-memory,
** store's name -- optional parameter, the system should be able to 
devise the name of the store transparently for the user, if we don't 
want to devise it ourselves/share the store between processors.

** scheduled punctuation.

Imagine we have a KStream, and we need to calculate a 
`derivative` stream, that is, a stream of 'deltas' of the provided 
integer values.


This could be achieved as simple as

stream.transform((key, value, stateStore) -> {
    int previousValue = 
Optional.ofNullable(stateStore.get(key)).orElse(0);

    stateStore.put(key, value);
    return KeyValue.pair(key, value - previousValue);
    }
    //we do not need to bother with store name, punctuation etc.
    //may be even Serde part can be omitted, since we can inherit 
the serdes from stream by default

    , Transformed.with(Serdes.String(), Serdes.Integer())
}

The hard part of it is that new `transform` method definition should be 
parameterized by six type parameters:


* input/output/KeyValueStore key type,
* input/output/KeyValueStore value type.

However, it seems that all these types can be inferred from the provided 
lambda and Transformed.with instances.


What do you think about this?

Regards,

Ivan


27.03.2019 20:45, Guozhang Wang пишет:

Hello Paul,

Thanks for the uploaded PR and the detailed description! I've made a pass
on it and left some comments.

Overall I think I agree with you that passing in the storebuilder directly
that store name is more convienent as it does not require another
`addStore` call, but we just need to spend some more documentation effort
on educating users about the two ways of connecting their stores. I'm
slightly concerned about this education curve but I can be convinced if
most people felt it is worthy.


Guozhang

On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen  wrote:


I'd like to resurrect this discussion with a cursory, proof-of-concept
implementation of the KIP which combines many of our ideas:
https://github.com/apache/kafka/pull/6496.  I tried to keep the diff as
small as possible for now, just using it to convey the main ideas.  But
I'll separately address some of our earlier discussion:

- Will there be a new, separate interface for users to implement for the
new functionality? No, to hopefully keep things simple, all of the
Processor/TransformerSupplier interfaces will just extend
StateStoresSupplier, allowing users to opt in to this functionality by
overriding the default implementation that gives an empty list.
- Will the interface allow users to specify the store name, or the
entire StoreBuilder? The entire StoreBuilder, so the
Processor/TransformerSupplier can completely encapsulate name and
implementation of a state store if desired.
- Will the old way of specifying store names alongside the supplier when
calling stream.process/transform() be deprecated? No, this is still a
legitimate way to wire up Processors/Transformers and their stores. But
I
would recommend not allowing stream.process/transform() calls that use
both
store declaration mechanisms (this restriction is not in the proof of
concept)
- How will we handle adding the same state store to the topology
multiple times because different Processor/TransformerSuppliers declare
it?
topology.addStateStore() will be slightly relaxed for convenience, and
will
allow adding the same StoreBuilder multiple times as long as the exact
same
StoreBuilder instance is being added for the same store name.  This
seems
to prevent in practice the issue of accidentally making two state stores
one by adding with the same name.  For additional safety, if we wanted
to
(not in the