[jira] [Created] (KAFKA-8706) Kafka 2.3.0 Unit Test Failures on Oracle Linux 7.2 - Need help debugging framework or issue.

2019-07-23 Thread Chandrasekhar (JIRA)
Chandrasekhar created KAFKA-8706:


 Summary: Kafka 2.3.0 Unit Test Failures on Oracle Linux 7.2 - Need 
help debugging framework or issue.
 Key: KAFKA-8706
 URL: https://issues.apache.org/jira/browse/KAFKA-8706
 Project: Kafka
  Issue Type: Test
  Components: core
Affects Versions: 2.3.0
Reporter: Chandrasekhar
 Attachments: KafkaAUTFailures.txt

Hi

We have just imported KAFKA 2.3.0 source code from git repo and compiling using 
Gradle 4.7 on Oracle VM with following info:

[vagrant@localhost kafka-2.3.0]$ uname -a
 Linux localhost 4.1.12-112.14.1.el7uek.x86_64 #2 SMP Fri Dec 8 18:37:23 PST 
2017 x86_64 x86_64 x86_64 GNU/Linux
 [vagrant@localhost kafka-2.3.0]$

 

Upon compiling , there are 6 test failures at the end. Failed Tests are 
reported as following:

DescribeConsumerGroupTest. testDescribeOffsetsOfExistingGroupWithNoMembers 
 SaslSslAdminClientIntegrationTest. 
testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords 
 UserQuotaTest. testQuotaOverrideDelete 
 UserQuotaTest. testThrottledProducerConsumer 
 MetricsDuringTopicCreationDeletionTest. testMetricsDuringTopicCreateDelete 
 SocketServerTest. testControlPlaneRequest

Attached find the failures.

 

[^KafkaAUTFailures.txt]

 

 

 We would like to know if we are missing anything in our build environment or 
if this is a known test failures in Kafka 2.3.0

 

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-07-23 Thread Hiroshi Nakahara (JIRA)
Hiroshi Nakahara created KAFKA-8705:
---

 Summary: NullPointerException was thrown by topology optimization 
when two MergeNodes have common KeyChaingingNode
 Key: KAFKA-8705
 URL: https://issues.apache.org/jira/browse/KAFKA-8705
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.3.0
Reporter: Hiroshi Nakahara


NullPointerException was thrown by topology optimization when two MergeNodes 
have common KeyChaingingNode.

Kafka Stream version: 2.3.0
h3. Code
{code:java}
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class Main {
public static void main(String[] args) {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream parentStream = 
streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
Serdes.Integer()))
.selectKey(Integer::sum);  // To make parentStream 
KeyChaingingPoint
final KStream childStream1 = parentStream.mapValues(v 
-> v + 1);
final KStream childStream2 = parentStream.mapValues(v 
-> v + 2);
final KStream childStream3 = parentStream.mapValues(v 
-> v + 3);
childStream1
.merge(childStream2)
.merge(childStream3)
.to("outputTopic");

final Properties properties = new Properties();
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);
streamsBuilder.build(properties);
}
}
{code}
h3. Expected result

streamsBuilder.build should create Topology without throwing Exception.  The 
expected topology is:
{code:java}
Topologies:
   Sub-topology: 0
Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
  --> KSTREAM-KEY-SELECT-01
Processor: KSTREAM-KEY-SELECT-01 (stores: [])
  --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
KSTREAM-MAPVALUES-04
  <-- KSTREAM-SOURCE-00
Processor: KSTREAM-MAPVALUES-02 (stores: [])
  --> KSTREAM-MERGE-05
  <-- KSTREAM-KEY-SELECT-01
Processor: KSTREAM-MAPVALUES-03 (stores: [])
  --> KSTREAM-MERGE-05
  <-- KSTREAM-KEY-SELECT-01
Processor: KSTREAM-MAPVALUES-04 (stores: [])
  --> KSTREAM-MERGE-06
  <-- KSTREAM-KEY-SELECT-01
Processor: KSTREAM-MERGE-05 (stores: [])
  --> KSTREAM-MERGE-06
  <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
Processor: KSTREAM-MERGE-06 (stores: [])
  --> KSTREAM-SINK-07
  <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
Sink: KSTREAM-SINK-07 (topic: outputTopic)
  <-- KSTREAM-MERGE-06
{code}
h3. Actual result

NullPointerException was thrown with the following stacktrace.
{code:java}
Exception in thread "main" java.lang.NullPointerException
at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
at 
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
at 
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
at 
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
at 
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
at 
org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
at Main.main(Main.java:24){code}
h3. Cause

This exception occurs in 
InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
{code:java}
private void maybeUpdateKeyChangingRepartitionNodeMap() {
final Map> 
mergeNodesToKeyChangers = new HashMap<>();
for (final StreamsGraphNode mergeNode : mergeNodes) {
mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
final Collection keys = 
keyChangingOperationsToOptimizableRepartitionNodes.keySet();
for (final StreamsGraphNode key : keys) {
final StreamsGraphNode maybeParentKey = 
findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
if (maybeParentKey != null) {
mergeNodesToKeyChangers.get(mergeNode).add(key);
}
}
}

for (final Map.Entry> entry : 
mergeNodesToKeyChangers.entrySet()) {
final StreamsGraphNode mergeKey = entry.getKey();
final Collection 

[jira] [Created] (KAFKA-8704) Add PartitionAssignor adapter for backwards compatibility

2019-07-23 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8704:
--

 Summary: Add PartitionAssignor adapter for backwards compatibility
 Key: KAFKA-8704
 URL: https://issues.apache.org/jira/browse/KAFKA-8704
 Project: Kafka
  Issue Type: Sub-task
  Components: clients
Reporter: Sophie Blee-Goldman


As part of KIP-429, we are deprecating the old 
consumer.internal.PartitionAssignor in favor of a [new 
consumer.PartitionAssignor|https://issues.apache.org/jira/browse/KAFKA-8703] 
interface  that is part of the public API.

 

Although the old PartitionAssignor was technically part of the internal 
package, some users may have implemented it and this change will break source 
compatibility for them as they would need to modify their class to implement 
the new interface. The number of users affected may be small, but nonetheless 
we would like to add an adapter to maintain compatibility for these users.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8703) Move PartitionAssignor to public API

2019-07-23 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8703:
--

 Summary: Move PartitionAssignor to public API
 Key: KAFKA-8703
 URL: https://issues.apache.org/jira/browse/KAFKA-8703
 Project: Kafka
  Issue Type: Sub-task
  Components: clients
Reporter: Sophie Blee-Goldman
Assignee: Sophie Blee-Goldman


Currently the PartitionAssignor, which is meant to be a pluggable interface, 
sits in the internal package. It should be part of the public API, so we are 
deprecating the old consumer.internal.PartitionAssignor in favor of a new 
consumer.PartitionAssignor.

 

We also want to take the opportunity to refactor the interface a bit, so as to 
achieve
 # Better separation of user/assignor and consumer provided metadata
 # Easier to evolve API

Due to the way assignors are instantiated, moving to a new PartitionAssignor 
interface will be fully compatible for most users except those who have 
implemented the internal.PartitionAssignor



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-07-23 Thread Matthias J. Sax
>> Just to make sure I understand the problem you're highlighting:
>> I guess the difference is that the serializer and deserializer that are
>> nested inside the serde also need to be configured? So, by default I'd have
>> to specify all six configs when I'm using Streams?

That is not the problem. And you actually describe the solution for it
yourself:

>> I guess in the Serde, it could make use of a package-protected constructor
>> for the serializer and deserializer that fixes the list type and inner type
>> to the serde-configured ones. Then, when you're configuring Streams, you
>> only need to specify the StreamsConfigs.




The problem is, that `ListSerde` is in package `clients` and thus
`ListSerde` cannot access `StreamsConfig`, and hence cannot use
`StreamsConfig#DEFAULT_LIST_KEY_SERDE_TYPE` (and others). Therefore, we
either need to hard-code strings literal for the config names (what does
not sound right) or add `CommonClientConfig#DEFAULT_LIST_KEY_SERDE_TYPE`
(and others).

In StreamsConfig we would just redefine them for convenience:

> public static final String DEFAULT_LIST_KEY_SERDE_TYPE = 
> CommonClientConfig#DEFAULT_LIST_KEY_SERDE_TYPE;


Note that `TimeWindowSerde` is contained in `streams` package and thus
it can access `StreamsConfig` and
`StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS`.




Btw: I just realized that we actually don't need `ProducerConfig`

> list.key/value.serializer.type

because the list-type is irrelevant on write. We only need `inner` config.



-Matthias


On 7/23/19 1:30 PM, John Roesler wrote:
> Hmm, that's a tricky situation.
> 
> I think Daniyar was on the right track... Producer only cares about
> serializer configs, and Consumer only cares about deserializer configs.
> 
> I didn't see the problem with your proposal:
> 
> ProducerConfig:
>> list.key/value.serializer.type
>> list.key/value.serializer.inner
>> ConsumerConfig:
>> list.key/value.deserializer.type
>> list.key/value.deserializer.inner
>> StreamsConfig:
>> default.list.key/value.serde.type
>> default.list.key/value.serde.inner
> 
> 
> It seems like the key/value serde configs are a better analogy than the
> windowed serde.
> ProducerConfig: key.serializer
> ConsumerConfig: key.deserializer
> StreamsConfig: default.key.serde
> 
> Just to make sure I understand the problem you're highlighting:
> I guess the difference is that the serializer and deserializer that are
> nested inside the serde also need to be configured? So, by default I'd have
> to specify all six configs when I'm using Streams?
> 
> I guess in the Serde, it could make use of a package-protected constructor
> for the serializer and deserializer that fixes the list type and inner type
> to the serde-configured ones. Then, when you're configuring Streams, you
> only need to specify the StreamsConfigs.
> 
> Does that work?
> -John
> 
> 
> On Tue, Jul 23, 2019 at 11:39 AM Development  wrote:
> 
>> Bump
>>
>>> On Jul 22, 2019, at 11:26 AM, Development  wrote:
>>>
>>> Hey Matthias,
>>>
>>> It looks a little confusing, but I don’t have enough expertise to judge
>> on the configuration placement.
>>>
>>> If you think, it is fine I’ll go ahead with this approach.
>>>
>>> Best,
>>> Daniyar Yeralin
>>>
 On Jul 19, 2019, at 5:49 PM, Matthias J. Sax 
>> wrote:

 Good point.

 I guess the simplest solution is, to actually add

>> default.list.key/value.serde.type
>> default.list.key/value.serde.inner

 to both `CommonClientConfigs` and `StreamsConfig`.

 It's not super clean, but I think it's the best we can do. Thoughts?


 -Matthias

 On 7/19/19 1:23 PM, Development wrote:
> Hi Matthias,
>
> I agree, ConsumerConfig did not seem like a right place for these
>> configurations.
> I’ll put them in ProducerConfig, ConsumerConfig, and StreamsConfig.
>
> However, I have a question. What should I do in "configure(Map> ?> configs, boolean isKey)” methods? Which configurations should I try to
>> locate? I was comparing my (de)serializer implementations with
>> SessionWindows(De)serializer classes, and they use StreamsConfig class to
>> get  either StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS :
>> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
>
> In my case, as I mentioned earlier, StreamsConfig class is not
>> accessible from org.apache.kafka.common.serialization package. So, I can’t
>> utilize it. Any suggestions here?
>
> Best,
> Daniyar Yeralin
>
>
>> On Jul 18, 2019, at 8:46 PM, Matthias J. Sax 
>> wrote:
>>
>> Thanks!
>>
>> One minor question about the configs. The KIP adds three classes, a
>> Serializer, a Deserializer, and a Serde.
>>
>> Hence, would it make sense to add the corresponding configs to
>> `ConsumerConfig`, `ProducerConfig`, and `StreamsConfig` using slightly
>> different names each time?
>>
>>
>> Somethin like this:
>>
>> 

Jenkins build is back to normal : kafka-trunk-jdk11 #711

2019-07-23 Thread Apache Jenkins Server
See 




Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Ryanne Dolan
Adam, I think we are converging :)

> "userEntity"...where I only want the latest emailAddress (basic
materialization) to send an email on account password update.

Yes, you want all "userEntity" data on both clusters. Each cluster will
have "userEntity" and the remote counterpart
"secondary/primary.userEntity", as in my example (1). The send-email part
can run on either cluster (but not both, to avoid duplicate emails),
subscribing to both "userEntity" and "secondary/primary.userEntity". For
DR, you can migrate this app between clusters via offset translation and
the kafka-streams-application-reset tool.

Then, you want a materialize-email-table app running in _both_ clusters, so
that the latest emails are readily available in RocksDB from either
cluster. This also subscribes to both "userEntity" and
"secondary/primary.userEntity" s.t. records originating from either cluster
are processed.

(Equivalently, send-email and materialize-email-table could be parts of the
same Streams app, just configured differently, e.g. with send-email
short-circuited in all but one cluster.)

Under normal operation, your userEntity events are sent to the primary
cluster (topic: userEntity), processed there via materialize-email-table
and send-email, and replicated to the secondary cluster (topic:
primary.userEntity) via MM2. When primary goes down, your producers
(whatever is sending userEntity events) can failover to the secondary
cluster (topic: userEntity). This can happen in real-time, i.e. as soon as
the producer detects an outage or via a load balancer with healthchecks
etc. So under normal operation, you have all userEntity events in both
clusters, and both clusters are available for producing to.

N.B. this is not dual-ingest, which would require you always produce
directly to both clusters. It's active/active, b/c you can produce to
either cluster at any point in time, and the effect is the same.

> Q1) Where does the producer write its data to if the primary cluster is
dead?

With active/active like this, you can send to either cluster.

> Q2) How does a Kafka Streams application materialize state from two
topics?

A Streams app can subscribe to multiple topics. A single "stream" can come
from multiple input topics (see:
https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.Collection-
)

Likewise, a KTable can be materialized from multiple source topics -- in
this case, userEntity, primary.userEntity and/or secondary.userEntity. You
can think of these as parts of a "virtual topic", as you described.

> (loaded question, I know)

There is one caveat I can think of: there is no ordering guarantee across
different topics in the same stream, so materialization could be
inconsistent between the two clusters if, say, the same users's email was
changed to different values at the same millisecond in both clusters. This
may or may not be a problem.

> Q3) ... recommendations on how to handle replication/producing of
entity-data (ie: userEntity) across multiple clusters...

Lemme know if I haven't answered this clearly.

Ryanne

On Tue, Jul 23, 2019 at 1:03 PM Adam Bellemare 
wrote:

> Hi Ryanne
>
> Thanks for the clarifications! Here is one of my own, as I think it's the
> biggest stumbling block in my description:
>
> *> What is "table" exactly? I am interpreting this as a KTable changelog
> topic*
> "table" is not a KTable changelog topic, but simply entity data that is to
> be materialized into a table - for example, relational data captured from
> Kafka Connect. I should have named this "stateful-data" or something less
> ambiguous and provided an explicit definition. Note that non-KStreams
> applications will also regularly use this entity data to materialize their
> own tables, but it in itself is not a KTable internal changelog.
>
> Per your example 1, let's name this topic "userEntity". It could be a
> (key,value) pair of (userId, emailAddress), where I only want the latest
> emailAddress (basic materialization) to send an email on account password
> update. I only want to run the application against one Kafka cluster, and
> because I don't want to use dual-ingest, I am running that application only
> on the cluster where the data is being sent (Primary Cluster). In a
> scenario where all replication is working correctly I could also run this
> off the Secondary cluster's replica, "primary.userEntity"
>
>
>
> *> Yes, that's something like "dual ingest", which I would not recommend.*
> Agreed. I do not want to use dual ingest.
>
> *> Secondary cluster:*
> *> Topics: events, primary.events, table-changelog*
> *> App subscription: events, primary.events*
> *> App output: table-changelog*
>
> Is the "events" topic dual ingest, since it exists in the Primary cluster
> with the exact same name?
>
> The whole scenario can be boiled down into the following:
> 1) Entity data is in a userEntity topic, ie: (userId, emailAddress)
> 2) I want to publish it into an Active-Active 

[jira] [Resolved] (KAFKA-8675) "Main" consumers are not unsubsribed on KafkaStreams.close()

2019-07-23 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8675.
--
Resolution: Not A Problem

> "Main" consumers are not unsubsribed on KafkaStreams.close()
> 
>
> Key: KAFKA-8675
> URL: https://issues.apache.org/jira/browse/KAFKA-8675
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.1
>Reporter: Modestas Vainius
>Priority: Major
>
> Hi!
> It seems that {{KafkaStreams.close()}} never unsubscribes "main" kafka 
> consumers. As far as I can tell, 
> {{org.apache.kafka.streams.processor.internals.TaskManager#shutdown}} does 
> unsubscribe only {{restoreConsumer}}. This results into Kafka Group 
> coordinator having to throw away consumer from the consumer group in a 
> non-clean way. {{KafkaStreams.close()}} does {{close()}} those consumers but 
> it seems that is not enough for clean exit.
> Kafka Streams connects to Kafka:
> {code:java}
> kafka| [2019-07-17 08:02:35,707] INFO [GroupCoordinator 1]: Preparing 
> to rebalance group 1-streams-test in state PreparingRebalance with old 
> generation 0 (__consumer_offsets-44) (reason: Adding new member 
> 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db)
>  (kafka.coordinator.group.GroupCoordinator)
> kafka| [2019-07-17 08:02:35,717] INFO [GroupCoordinator 1]: 
> Stabilized group 1-streams-test generation 1 (__consumer_offsets-44) 
> (kafka.coordinator.group.GroupCoordinator)
> kafka| [2019-07-17 08:02:35,730] INFO [GroupCoordinator 1]: 
> Assignment received from leader for group 1-streams-test for generation 1 
> (kafka.coordinator.group.GroupCoordinator)
> {code}
> Processing finishes in 2 secs but after 10 seconds I see this in Kafka logs:
> {code:java}
> kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Member 
> 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db
>  in group 1-streams-test has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Preparing 
> to rebalance group 1-streams-test in state PreparingRebalance with old 
> generation 1 (__consumer_offsets-44) (reason: removing member 
> 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db
>  on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
> kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Group 
> 1-streams-test with generation 2 is now empty (__consumer_offsets-44) 
> (kafka.coordinator.group.GroupCoordinator)
> {code}
> Topology is kind of similar to [kafka testing 
> example|https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html]
>  but I tried on real kafka instance (one node):
> {code:java}
> new Topology().with {
> it.addSource("sourceProcessor", "input-topic")
> it.addProcessor("aggregator", new 
> CustomMaxAggregatorSupplier(), "sourceProcessor")
> it.addStateStore(
> Stores.keyValueStoreBuilder(
> Stores.inMemoryKeyValueStore("aggStore"),
> Serdes.String(),
> Serdes.Long()).withLoggingDisabled(), // need to 
> disable logging to allow aggregatorStore pre-populating
> "aggregator")
> it.addSink(
> "sinkProcessor",
> "result-topic",
> "aggregator"
> )
> it
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-23 Thread Guozhang Wang
Hi John,

I left another question regarding Transformer in the DISCUSS thread. Other
than that I think this KIP is ready. Thanks!


Guozhang


On Tue, Jul 16, 2019 at 9:01 AM John Roesler  wrote:

> Hi Dev,
>
> After a good discussion, I'd like to start the vote for KIP-478
> (https://cwiki.apache.org/confluence/x/2SkLBw).
>
> The proposal is to deprecate the existing interface
> org.apache.kafka.streams.processor.Processor in favor of a
> new one, org.apache.kafka.streams.processor.api.Processor KOut, VOut> that parameterizes both the input and output types.
>
> This change enables both the Streams DSL internal code and external
> Processor API code to improve their type safety and protect themselves
> from type-level bugs.
>
> Thanks,
> -John
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-478 Strongly Typed Processor API

2019-07-23 Thread Guozhang Wang
Hi John,

Just a wild thought about Transformer: now with the new Processor#init(ProcessorContext), do we still need a
Transformer (and even ValueTransformer / ValueTransformerWithKey)?

What if:

* We just make KStream#transform to get a ProcessorSupplier as well, and
inside `process()` we check that at most one `context.forward()` is called,
and then take it as the return value.
* We would still use ValueTransformer for KStream#transformValue, or we can
also use a `ProcessorSupplier where we allow at most one
`context.forward()` AND we ignore whatever passed in as key but just use
the original key.


Guozhang


On Tue, Jul 16, 2019 at 9:03 AM John Roesler  wrote:

> Hi again, all,
>
> I have started the voting thread. Please cast your votes (or voice
> your objections)! The vote will remain open at least 72 hours. Once it
> closes, I can send the PR pretty quickly.
>
> Thanks for all you help ironing out the details on this feature.
> -John
>
> On Mon, Jul 15, 2019 at 5:09 PM John Roesler  wrote:
> >
> > Hey all,
> >
> > It sounds like there's general agreement now on this KIP, so I updated
> > the KIP to fit in with Guozhang's overall proposed package structure.
> > Specifically, the proposed name for the new Processor interface is
> > "org.apache.kafka.streams.processor.api.Processor".
> >
> > If there are no objections, then I plan to start the vote tomorrow!
> >
> > Thanks, all, for your contributions.
> > -John
> >
> > On Thu, Jul 11, 2019 at 1:50 PM Matthias J. Sax 
> wrote:
> > >
> > > Side remark:
> > >
> > > > Now that "flat transform" is a specific
> > > >> part of the API it seems okay to steer folks in that direction (to
> never
> > > >> use context.process in a transformer), but it should be called out
> > > >> explicitly in javadocs.  Currently Transformer (which is used for
> both
> > > >> transform() and flatTransform() ) doesn't really call out the
> ambiguity:
> > >
> > > Would you want to do a PR for address this? We are always eager to
> > > improve the JavaDocs!
> > >
> > >
> > > -Matthias
> > >
> > > On 7/7/19 11:26 AM, Paul Whalen wrote:
> > > > First of all, +1 on the whole idea, my team has run into (admittedly
> minor,
> > > > but definitely annoying) issues because of the weaker typing.  We're
> heavy
> > > > users of the PAPI and have Processors that, while not hundreds of
> lines
> > > > long, are certainly quite hefty and call context.forward() in many
> places.
> > > >
> > > > After reading the KIP and discussion a few times, I've convinced
> myself
> > > > that any initial concerns I had aren't really concerns at all (state
> store
> > > > types, for one).  One thing I will mention:  changing *Transformer*
> to have
> > > > ProcessorContext gave me pause, because I have code that
> does
> > > > context.forward in transformers.  Now that "flat transform" is a
> specific
> > > > part of the API it seems okay to steer folks in that direction (to
> never
> > > > use context.process in a transformer), but it should be called out
> > > > explicitly in javadocs.  Currently Transformer (which is used for
> both
> > > > transform() and flatTransform() ) doesn't really call out the
> ambiguity:
> > > >
> https://github.com/apache/kafka/blob/ca641b3e2e48c14ff308181c775775408f5f35f7/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java#L75-L77
> ,
> > > > and for migrating users (from before flatTransform) it could be
> confusing.
> > > >
> > > > Side note, I'd like to plug KIP-401 (there is a discussion thread
> and a
> > > > voting thread) which also relates to using the PAPI.  It seems like
> there
> > > > is some interest and it is in a votable state with the majority of
> > > > implementation complete.
> > > >
> > > > Paul
> > > >
> > > > On Fri, Jun 28, 2019 at 2:02 PM Bill Bejeck 
> wrote:
> > > >
> > > >> Sorry for coming late to the party.
> > > >>
> > > >> As for the naming I'm in favor of RecordProcessor as well.
> > > >>
> > > >> I agree that we should not take on doing all of the package
> movements as
> > > >> part of this KIP, especially as John has pointed out, it will be an
> > > >> opportunity to discuss some clean-up on individual classes which I
> envision
> > > >> becoming another somewhat involved process.
> > > >>
> > > >> For the end goal, if possible, here's what I propose.
> > > >>
> > > >>1. We keep the scope of the KIP the same, *but we only
> implement* *it in
> > > >>phases*
> > > >>2. Phase one could include what Guozhang had proposed earlier
> namely
> > > >>1. > 1.a) modifying ProcessorContext only with the output types
> on
> > > >>   forward.
> > > >>   > 1.b) modifying Transformer signature to have generics of
> > > >>   ProcessorContext,
> > > >>   > and then lift the restricting of not using punctuate: if
> user did
> > > >>   not
> > > >>   > follow the enforced typing and just code without generics,
> they
> > > >>   will get
> > > >>   > warning at compile time and get run-time 

Build failed in Jenkins: kafka-trunk-jdk8 #3808

2019-07-23 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-8526; Fallback to other log dirs after getOrCreateLog failure

--
[...truncated 2.57 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled 

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-23 Thread Levani Kokhreidze
Hello,

Thanks all for your feedback.
I started voting procedure for this KIP. If there’re any other concerns about 
this KIP, please let me know.

Regards,
Levani

> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze  wrote:
> 
> Hi Matthias,
> 
> Thanks for the suggestion, makes sense.
> I’ve updated KIP 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>  
> ).
> 
> Regards,
> Levani
> 
> 
>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax > > wrote:
>> 
>> Thanks for driving the KIP.
>> 
>> I agree that users need to be able to specify a partitioning strategy.
>> 
>> Sophie raises a fair point about topic configs and producer configs. My
>> take is, that consider `Repartitioned` as an "extension" to `Produced`,
>> that adds topic configuration, is a good way to think about it and helps
>> to keep the API "clean".
>> 
>> 
>> With regard to method names. I would prefer to avoid abbreviations. Can
>> we rename:
>> 
>> `withNumOfPartitions` -> `withNumberOfPartitions`
>> 
>> Furthermore, it might be good to add some more `static` methods:
>> 
>> - Repartitioned.with(Serde, Serde)
>> - Repartitioned.withNumberOfPartitions(int)
>> - Repartitioned.streamPartitioner(StreamPartitioner)
>> 
>> 
>> -Matthias
>> 
>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
>>> Totally agree. I think in KStream interface it makes sense to have some 
>>> duplicate configurations between operators in order to keep API simple and 
>>> usable.
>>> Also, as more surface API has, harder it is to have proper backward 
>>> compatibility.
>>> While initial idea of keeping topic level configs separate was exciting, 
>>> having Repartitioned class encapsulate some producer level configs makes 
>>> API more readable.
>>> 
>>> Regards,
>>> Levani
>>> 
 On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman >>> > wrote:
 
 I think that is a good point about trying to keep producer level
 configurations and (repartition) topic level considerations separate.
 Number of partitions is definitely purely a topic level configuration. But
 on some level, serdes and partitioners are just as much a topic
 configuration as a producer one. You could have two producers configured
 with different serdes and/or partitioners, but if they are writing to the
 same topic the result would be very difficult to part. So in a sense, these
 are configurations of topics in Streams, not just producers.
 
 Another way to think of it: while the Streams API is not always true to
 this, ideally all the relevant configs for an operator are wrapped into a
 single object (in this case, Repartitioned). We could instead split out the
 fields in common with Produced into a separate parameter to keep topic and
 producer level configurations separate, but this increases the API surface
 area by a lot. It's much more straightforward to just say "this is
 everything that this particular operator needs" without worrying about what
 exactly you're specifying.
 
 I suppose you could alternatively make Produced a field of Repartitioned,
 but I don't think we do this kind of composition elsewhere in Streams at
 the moment
 
 On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze >>> >
 wrote:
 
> Hi Bill,
> 
> Thanks a lot for the feedback.
> Yes, that makes sense. I’ve updated KIP with `Repartitioned#partitioner`
> configuration.
> In the beginning, I wanted to introduce a class for topic level
> configuration and keep topic level and producer level configurations (such
> as Produced) separately (see my second email in this thread).
> But while looking at the semantics of KStream interface, I couldn’t really
> figure out good operation name for Topic level configuration class and 
> just
> introducing `Topic` config class was kinda breaking the semantics.
> So I think having Repartitioned class which encapsulates topic and
> producer level configurations for internal topics is viable thing to do.
> 
> Regards,
> Levani
> 
>> On Jul 19, 2019, at 7:47 PM, Bill Bejeck > > wrote:
>> 
>> Hi Lavani,
>> 
>> Thanks for resurrecting this KIP.
>> 
>> I'm also a +1 for adding a partition option.  In addition to the reason
>> provided by John, my reasoning is:
>> 
>> 1. Users may want to use something other than hash-based partitioning
>> 2. Users may wish to partition on something different than the key
>> without having to change the key.  For example:
>>1. A combination of fields in the value in conjunction with the key
>>2. Something other than 

[VOTE] KIP-221: Enhance KStream with Connecting Topic Creation and Repartition Hint

2019-07-23 Thread Levani Kokhreidze
Hello,

I’d like to initialize voting on KIP-221: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
 

If there’re any more concerns about the KIP, happy to discuss further.

Regards,
Levani

Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-07-23 Thread John Roesler
Hmm, that's a tricky situation.

I think Daniyar was on the right track... Producer only cares about
serializer configs, and Consumer only cares about deserializer configs.

I didn't see the problem with your proposal:

ProducerConfig:
> list.key/value.serializer.type
> list.key/value.serializer.inner
> ConsumerConfig:
> list.key/value.deserializer.type
> list.key/value.deserializer.inner
> StreamsConfig:
> default.list.key/value.serde.type
> default.list.key/value.serde.inner


It seems like the key/value serde configs are a better analogy than the
windowed serde.
ProducerConfig: key.serializer
ConsumerConfig: key.deserializer
StreamsConfig: default.key.serde

Just to make sure I understand the problem you're highlighting:
I guess the difference is that the serializer and deserializer that are
nested inside the serde also need to be configured? So, by default I'd have
to specify all six configs when I'm using Streams?

I guess in the Serde, it could make use of a package-protected constructor
for the serializer and deserializer that fixes the list type and inner type
to the serde-configured ones. Then, when you're configuring Streams, you
only need to specify the StreamsConfigs.

Does that work?
-John


On Tue, Jul 23, 2019 at 11:39 AM Development  wrote:

> Bump
>
> > On Jul 22, 2019, at 11:26 AM, Development  wrote:
> >
> > Hey Matthias,
> >
> > It looks a little confusing, but I don’t have enough expertise to judge
> on the configuration placement.
> >
> > If you think, it is fine I’ll go ahead with this approach.
> >
> > Best,
> > Daniyar Yeralin
> >
> >> On Jul 19, 2019, at 5:49 PM, Matthias J. Sax 
> wrote:
> >>
> >> Good point.
> >>
> >> I guess the simplest solution is, to actually add
> >>
>  default.list.key/value.serde.type
>  default.list.key/value.serde.inner
> >>
> >> to both `CommonClientConfigs` and `StreamsConfig`.
> >>
> >> It's not super clean, but I think it's the best we can do. Thoughts?
> >>
> >>
> >> -Matthias
> >>
> >> On 7/19/19 1:23 PM, Development wrote:
> >>> Hi Matthias,
> >>>
> >>> I agree, ConsumerConfig did not seem like a right place for these
> configurations.
> >>> I’ll put them in ProducerConfig, ConsumerConfig, and StreamsConfig.
> >>>
> >>> However, I have a question. What should I do in "configure(Map ?> configs, boolean isKey)” methods? Which configurations should I try to
> locate? I was comparing my (de)serializer implementations with
> SessionWindows(De)serializer classes, and they use StreamsConfig class to
> get  either StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS :
> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> >>>
> >>> In my case, as I mentioned earlier, StreamsConfig class is not
> accessible from org.apache.kafka.common.serialization package. So, I can’t
> utilize it. Any suggestions here?
> >>>
> >>> Best,
> >>> Daniyar Yeralin
> >>>
> >>>
>  On Jul 18, 2019, at 8:46 PM, Matthias J. Sax 
> wrote:
> 
>  Thanks!
> 
>  One minor question about the configs. The KIP adds three classes, a
>  Serializer, a Deserializer, and a Serde.
> 
>  Hence, would it make sense to add the corresponding configs to
>  `ConsumerConfig`, `ProducerConfig`, and `StreamsConfig` using slightly
>  different names each time?
> 
> 
>  Somethin like this:
> 
>  ProducerConfig:
> 
>  list.key/value.serializer.type
>  list.key/value.serializer.inner
> 
>  ConsumerConfig:
> 
>  list.key/value.deserializer.type
>  list.key/value.deserializer.inner
> 
>  StreamsConfig:
> 
>  default.list.key/value.serde.type
>  default.list.key/value.serde.inner
> 
> 
>  Adding `d.l.k/v.serde.t/i` to `CommonClientConfigs does not sound
> right
>  to me. Also note, that it seems better to avoid the `default.` prefix
>  for consumers and producers because there is only one Serializer or
>  Deserializer anyway. Only for Streams, there are multiple and
>  StreamsConfig specifies the default one of an operator does not
>  overwrite it.
> 
>  Thoughts?
> 
> 
>  Also, the KIP should explicitly mention to what classed certain
> configs
>  are added. Atm, the KIP only list parameter names, but does not state
>  where those are added.
> 
> 
>  -Matthias
> 
> 
> 
> 
> 
>  On 7/16/19 1:11 PM, Development wrote:
> > Hi,
> >
> > Yes, totally forgot about the statement. KIP-466 is updated.
> >
> > Thank you so much John Roesler, Matthias J. Sax, Sophie Blee-Goldman
> for your valuable input!
> >
> > I hope I did not cause too much trouble :)
> >
> > I’ll start the vote now.
> >
> > Best,
> > Daniyar Yeralin
> >
> >> On Jul 16, 2019, at 3:17 PM, John Roesler 
> wrote:
> >>
> >> Hi Daniyar,
> >>
> >> Thanks for that update. I took a look, and I think this is in good
> shape.
> >>
> >> One note, the statement "New 

[jira] [Created] (KAFKA-8702) Kafka leader election doesn't happen when leader broker port is partitioned off the network

2019-07-23 Thread Andrey Falko (JIRA)
Andrey Falko created KAFKA-8702:
---

 Summary: Kafka leader election doesn't happen when leader broker 
port is partitioned off the network
 Key: KAFKA-8702
 URL: https://issues.apache.org/jira/browse/KAFKA-8702
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.0
Reporter: Andrey Falko


We first started seeing this with 2.1.1 version of Kafka. We are currently on 
2.3.0. 

We were able to actively reproduce this today on one of our staging 
environments. The reproduction steps are as follows: 
1) Push some traffic to a topic that looks like this: 
$ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= 
/kafka/config/server.properties | awk -F= '\{print $2}') --topic test 
Topic:test  PartitionCount:6    ReplicationFactor:3 
Configs:cleanup.policy=delete,[retention.ms|http://retention.ms/]=8640 
   Topic: test Partition: 0    Leader: 0   Replicas: 2,0,1 Isr: 1,0 
   Topic: test Partition: 1    Leader: 0   Replicas: 0,1,2 Isr: 1,0 
   Topic: test Partition: 2    Leader: 1   Replicas: 1,2,0 Isr: 1,0 
   Topic: test Partition: 3    Leader: 1   Replicas: 2,1,0 Isr: 1,0 
   Topic: test Partition: 4    Leader: 0   Replicas: 0,2,1 Isr: 1,0 
   Topic: test Partition: 5    Leader: 1   Replicas: 1,0,2 Isr: 1,0

2) We proceed to run the following on broker 0:
iptables -D INPUT -j DROP -p tcp --destination-port 9093 && iptables -D OUTPUT 
-j DROP -p tcp --destination-port 9093
Note: our replication and traffic from clients comes in on TLS protected port 
9093 only. 

3) Leadership doesn't change b/c Zookeeper connection is unaffected. However, 
we start seeing URP. 

4) We reboot broker 0. We see offline partitions. Leadership never changes and 
the cluster only recovers when broker 0 comes back online.

My colleague Kailash was helping me reproduce this today and I have added him 
to the CC list. Should we post this behavior on the public Kafka channel and 
see if this is worthy of filing on a bug on? We don't mind the URP state 
behavior, but as soon as broker 0 get killed, leader election would ideally 
occur to avoid offline state.

Best regards,
Andrey Falko



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

2019-07-23 Thread Matthias J. Sax
Thanks for the KIP Guozhang.

I just re-read the wiki page and the DISCUSS thread. Overall LGTM.

The only nit is the naming of the new config values. With AK 2.3 being
released the versions numbers needs to be updated.

Additionally, I actually think that "2.2-" and "2.3" are not the best
names: the `-` suffix is very subtle IMHO and actually looks more like a
typo, and it might be better to be more elaborate. Maybe something like
"up-to-2.2" ?

For "2.3", this config value would be weird for future releases (ie,
2.4, 2.5, 2.6). Hence, we might want to rename it to "newest" /
"current" or something like this?

Another alternative may be to rename it to "since-2.3" (or similar) --
however, this may require to rename the config if we change metrics in a
future release (hence, it's not my preferred option).

Thoughts?


-Matthias

On 7/22/19 6:33 PM, Guozhang Wang wrote:
> Thanks everyone for your inputs, I've updated the wiki page accordingly.
> 
> @Bruno: please let me know if you have any further thoughts per my replies
> above.
> 
> 
> Guozhang
> 
> 
> On Mon, Jul 22, 2019 at 6:30 PM Guozhang Wang  wrote:
> 
>> Thanks Boyang,
>>
>> I've thought about exposing time via metrics in Streams. The tricky part
>> though is which layer of time we should expose: right now we have
>> task-level and partition-level stream time (what you suggested), and also
>> some processor internally maintain their own observed time. Today we are
>> still trying to get a clear and simple way of exposing a single time
>> concept for users to reason about their application's progress. So before
>> we come up with a good solution I'd postpone adding it in a future KIP.
>>
>>
>> Guozhang
>>
>>
>> On Thu, Jul 18, 2019 at 1:21 PM Boyang Chen 
>> wrote:
>>
>>> I mean the partition time.
>>>
>>> On Thu, Jul 18, 2019 at 11:29 AM Guozhang Wang 
>>> wrote:
>>>
 Hi Boyang,

 What do you mean by `per partition latency`?

 Guozhang

 On Mon, Jul 1, 2019 at 9:28 AM Boyang Chen 
 wrote:

> Hey Guozhang,
>
> do we plan to add per partition latency in this KIP?
>
> On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna 
>>> wrote:
>
>> Hi Guozhang,
>>
>> Thank you for the KIP.
>>
>> 1) As far as I understand, the StreamsMetrics interface is there for
>> user-defined processors. Would it make sense to also add a method to
>> the interface to specify a sensor that records skipped records?
>>
>> 2) What are the semantics of active-task-process and
 standby-task-process
>>
>> 3) How do dropped-late-records and expired-window-record-drop relate
>> to each other? I guess the former is for records that fall outside
>>> the
>> grace period and the latter is for records that are processed after
>> the retention period of the window. Is this correct?
>>
>> 4) Is there an actual difference between skipped and dropped
>>> records?
>> If not, shall we unify the terminology?
>>
>> 5) What happens with removed metrics when the user sets the version
>>> of
>> "built.in.metrics.version" to 2.2-
>>
>> Best,
>> Bruno
>>
>> On Thu, Jun 27, 2019 at 6:11 PM Guozhang Wang 
> wrote:
>>>
>>> Hello folks,
>>>
>>> As 2.3 is released now, I'd like to bump up this KIP discussion
>>> again
> for
>>> your reviews.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Thu, May 23, 2019 at 4:44 PM Guozhang Wang >>>
>> wrote:
>>>
 Hello Patrik,

 Since we are rolling out 2.3 and everyone is busy with the
>>> release
> now
 this KIP does not have much discussion involved yet and will
>>> slip
> into
>> the
 next release cadence.

 This KIP itself contains several parts itself: 1. refactoring
>>> the
>> existing
 metrics hierarchy to cleanup some redundancy and also get more
>> clarity; 2.
 add instance-level metrics like rebalance and state metrics, as
 well
> as
 other static metrics.


 Guozhang



 On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl <
>>> pklei...@gmail.com
>
>> wrote:

> Hi Guozhang
> Thanks for the KIP, this looks very helpful.
> Could you please provide more detail on the metrics planned for
 the
>> state?
> We were just considering how to implement this ourselves
>>> because
 we
>> need
> to
> track the history of stage changes.
> The idea was to have an accumulated "seconds in state x" metric
 for
>> every
> state.
> The new rebalance metric might solve part of our use case, but
>>> it
 is
> interesting what you have planned for the state metric.
> best regards
> Patrik
>
> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang <
>>> wangg...@gmail.com>
>> 

Re: Monitoring streams

2019-07-23 Thread Guozhang Wang
Hello Brian,

I think your main question is to distinguish 1) broker is alive but there's
no new data coming into the source topics to process, and 2) broker is not
alive and hence nothing is readable, in your monitoring system. I agree
that currently process-rate / last-record-timestamp cannot successfully
distinguish the two.

I've put a bit more thoughts on
https://issues.apache.org/jira/browse/KAFKA-6520 and updated the
description: originally we want to add a new State into Streams but later
on we realized that 1) it is a bit overkill to complicate the FSM for this
transient state and 2) it is actually a general issue that should be
tackled on the lower level rather than on Streams only.


Guozhang


On Mon, Jul 8, 2019 at 5:15 PM Brian Putt  wrote:

> Hello,
>
> We have multiple stream services that we're looking to monitor when they've
> been disconnected from the broker so that we can restart the services.
>
> I've looked at https://issues.apache.org/jira/browse/KAFKA-6520 and am
> wondering if anyone has suggestions on what we can do today to help ensure
> our services don't go idle.
>
> As an example, we'll have our streaming services running and we'll
> stop/start the kafka brokers. The services will remain running, but they're
> not actually pulling any data.
>
> We could look at time since last record received, but that's not a
> guarantee as there's always a possibility that data was legitimately turned
> off upstream.
>
> Thanks,
>
> Brian
>


-- 
-- Guozhang


[jira] [Resolved] (KAFKA-6708) Review Exception messages with regards to Serde Useage

2019-07-23 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6708.

Resolution: Duplicate

> Review Exception messages with regards to Serde Useage
> --
>
> Key: KAFKA-6708
> URL: https://issues.apache.org/jira/browse/KAFKA-6708
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: newbie
>
> Error messages when not including Serdes required other than the provided 
> default ones should have error messages that are more specific with what 
> needs to be done and possible causes than just a {{ClassCastException}}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Adam Bellemare
Hi Ryanne

Thanks for the clarifications! Here is one of my own, as I think it's the
biggest stumbling block in my description:

*> What is "table" exactly? I am interpreting this as a KTable changelog
topic*
"table" is not a KTable changelog topic, but simply entity data that is to
be materialized into a table - for example, relational data captured from
Kafka Connect. I should have named this "stateful-data" or something less
ambiguous and provided an explicit definition. Note that non-KStreams
applications will also regularly use this entity data to materialize their
own tables, but it in itself is not a KTable internal changelog.

Per your example 1, let's name this topic "userEntity". It could be a
(key,value) pair of (userId, emailAddress), where I only want the latest
emailAddress (basic materialization) to send an email on account password
update. I only want to run the application against one Kafka cluster, and
because I don't want to use dual-ingest, I am running that application only
on the cluster where the data is being sent (Primary Cluster). In a
scenario where all replication is working correctly I could also run this
off the Secondary cluster's replica, "primary.userEntity"



*> Yes, that's something like "dual ingest", which I would not recommend.*
Agreed. I do not want to use dual ingest.

*> Secondary cluster:*
*> Topics: events, primary.events, table-changelog*
*> App subscription: events, primary.events*
*> App output: table-changelog*

Is the "events" topic dual ingest, since it exists in the Primary cluster
with the exact same name?

The whole scenario can be boiled down into the following:
1) Entity data is in a userEntity topic, ie: (userId, emailAddress)
2) I want to publish it into an Active-Active cluster setup without using
dual-ingest
3) I want to materialize the data into a single table for an application
consuming from a single cluster (Kafka Streams or not)
4) I want to be able to fail over and rebuild the materialized state using
the data I have replicated.
- If all of the entity data is produced to each cluster (dual-ingest) than
it is trivial to fail over and rebuild the materialized table.
- If the data is only produced to Primary and only replicated to Secondary,
at a failover I would need to consume from the replicated topic.
*Q1) Where does the producer write its data to if the primary cluster
is dead?*
It seems to me that it must then write its data to the only
remaining cluster. This would then put the entity data in two topics as I
had originally outlined, as below:
*Secondary Cluster: (Live)   (renamed table to userEntity)*
  Topic: "primary.userEntity" (contains data from T = 0 to T = n)
  Topic: "userEntity" (contains data from T = n+1 to now, the
failed-over producer)


*Q2) How does a Kafka Streams application materialize state from two
topics? (loaded question, I know)*
  Since I know this isn't built in, is there some sort of technique
or system that you use to allow for a single virtual topic made up of many
logical topics?

*Q3) Do you have any recommendations on how to handle replication/producing
of entity-data (ie: userEntity) across multiple clusters, such that an
application may correctly (or even near-correctly) materialize state after
a failover like the one I described above?*
This is really the golden question. We're currently developing our
Active-Passive approach, but we want to be prepared for scenarios where we
have multiple clusters with entity-replication between clusters.


Thanks Ryanne!


On Tue, Jul 23, 2019 at 12:39 PM Ryanne Dolan  wrote:

> Adam,
>
> > I think we have inconsistent definitions of Active-Active
>
> Yes, this terminology gets thrown around a lot. IMO "active" means both
> producers and consumers are using a cluster under normal operation -- not
> just during outages, and not just by something like MM2. (Obviously, MM2
> has producers and consumers, but they don't count here.) Conversely,
> "standby" or "backup" means that data is being written by a producer, but
> it isn't being consumed under normal operation. I qualify this definition
> with IMO, as I don't think there is strong consensus here.
>
> I'll also add a caveat about "under normal operation". An active/active
> architecture does not necessarily mean that you use both clusters in the
> same way all the time -- only that you _could_. You could load-balance
> 50/50 of your traffic between two clusters, or you could direct 100% to one
> and 0% to the other, e.g. if one is farther away or has less hw resources.
> But the architecture remains the same (and certainly, MM2 doesn't care
> about this detail).
>
> > The producer is only producing to one cluster (primary) and one topic
> (topic "table"), and the other cluster (secondary) contains only a
> replication of the data via MM2 ("primary.table").
>
> That, by definition, is not active/active.
>
> >What you seemed to be proposing is that the producer's "table" 

[DISCUSS] KIP-496: Administrative API to delete consumer offsets

2019-07-23 Thread Jason Gustafson
Hi All,

I have a short KIP to add an api for consumer offset deletion:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets.
Please take a look and let me know what you think.

Thanks,
Jason


[jira] [Created] (KAFKA-8701) Flaky Test SaslSslAdminClientIntegrationTest#testDescribeConfigsForTopic

2019-07-23 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8701:
--

 Summary: Flaky Test 
SaslSslAdminClientIntegrationTest#testDescribeConfigsForTopic
 Key: KAFKA-8701
 URL: https://issues.apache.org/jira/browse/KAFKA-8701
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Affects Versions: 2.4.0
Reporter: Matthias J. Sax
 Fix For: 2.4.0


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/477/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testDescribeConfigsForTopic/]
{quote}org.scalatest.exceptions.TestFailedException: Partition [topic,0] 
metadata not propagated after 15000 ms at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at 
org.scalatest.Assertions.fail(Assertions.scala:1091) at 
org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822) at 
kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:911) at 
kafka.utils.TestUtils$.$anonfun$createTopic$3(TestUtils.scala:337) at 
kafka.utils.TestUtils$.$anonfun$createTopic$3$adapted(TestUtils.scala:336) at 
scala.collection.immutable.Range.map(Range.scala:59) at 
kafka.utils.TestUtils$.createTopic(TestUtils.scala:336) at 
kafka.integration.KafkaServerTestHarness.createTopic(KafkaServerTestHarness.scala:126)
 at 
kafka.api.AdminClientIntegrationTest.testDescribeConfigsForTopic(AdminClientIntegrationTest.scala:1008){quote}
 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Ryanne Dolan
Adam,

> I think we have inconsistent definitions of Active-Active

Yes, this terminology gets thrown around a lot. IMO "active" means both
producers and consumers are using a cluster under normal operation -- not
just during outages, and not just by something like MM2. (Obviously, MM2
has producers and consumers, but they don't count here.) Conversely,
"standby" or "backup" means that data is being written by a producer, but
it isn't being consumed under normal operation. I qualify this definition
with IMO, as I don't think there is strong consensus here.

I'll also add a caveat about "under normal operation". An active/active
architecture does not necessarily mean that you use both clusters in the
same way all the time -- only that you _could_. You could load-balance
50/50 of your traffic between two clusters, or you could direct 100% to one
and 0% to the other, e.g. if one is farther away or has less hw resources.
But the architecture remains the same (and certainly, MM2 doesn't care
about this detail).

> The producer is only producing to one cluster (primary) and one topic
(topic "table"), and the other cluster (secondary) contains only a
replication of the data via MM2 ("primary.table").

That, by definition, is not active/active.

>What you seemed to be proposing is that the producer's "table" data is
sent fully to each cluster, such that the state can be materialized as a
KTable in each application running on each cluster.

Correct.

> This wouldn't require MM2 at all, so I'm not sure if this is what you
advocated.

You could use a dual-ingest method and send all your data to both clusters,
which would not require MM2. There are many issues with this approach,
primarily wrt to consistency and efficiency.

> The trivial solution seems to be to make your producers produce all
stateful data (topic "table") to each cluster, which makes MM2 unnecessary,
but can also lead to data inconsistencies so it's not exactly foolproof.

Yes, that's something like "dual ingest", which I would not recommend.

> SteamsAppPrimary is consuming from ("table")

What is "table" exactly? I am interpreting this as a KTable changelog
topic, in which case "table" is an output topic of some streams app, i.e.
the app producing the change events. _This_ is the app I mean to suggest
you run on both clusters. Then, "table" will appear on both clusters (no
"primary.table").

The app that is creating the "table" changelog would be processing events
from some other topic, say "events". Then, this is what I recommend:

Primary cluster:
Topics: events, secondary.events, table-changelog
App subscription: events, secondary.events
App output: table-changelog

Secondary cluster:
Topics: events, primary.events, table-changelog
App subscription: events, primary.events
App output: table-changelog

With this arrangement, the app on either cluster will have built up state
in RocksDB based on events from both clusters.

Now, it seems you also want a second app to process this changelog. I can
see a few scenarios:

1) you want to take some external action based on records in the table
changelog, e.g. to send an email every time a password is updated. In this
case, you don't want this app running in both clusters, as you'd get two
emails. So you could run it in one cluster and use offset translation to
migrate during failover. The send-email app is stateless, so you just need
to translate and reset offsets (there is no internal state to rebuild).

2) you want to use the table changelog in a stateful but non-effecting way,
e.g. by keeping a running count of records. This app, like the first, can
be run in both clusters.

3) you want some combination of state and external actions in one big app.
In this case, I'd consider splitting your app in two so that you can built
state in both clusters while effecting external actions in only one cluster
at a time.

Lemme know if that makes sense.

Ryanne

On Tue, Jul 23, 2019 at 10:19 AM Adam Bellemare 
wrote:

> Hi Ryanne
>
> I think we have inconsistent definitions of Active-Active. The producer is
> only producing to one cluster (primary) and one topic (topic "table"), and
> the other cluster (secondary) contains only a replication of the data via
> MM2 ("primary.table"). What you seemed to be proposing is that the
> producer's "table" data is sent fully to each cluster, such that the state
> can be materialized as a KTable in each application running on each
> cluster. This wouldn't require MM2 at all, so I'm not sure if this is what
> you advocated.
>
> You also state that "As with normal consumers, the Streams app should 
> *subscribe
> to any remote topics*, e.g. with a regex, s.t. the application state will
> reflect input from either source cluster.". Wouldn't this mean that the
> stateful "table" topic that we wish to materialize would be replicated by
> MM2 from Primary, such that we end up with the following:
>
> *Replicated Entity/Stateful Data:*
> *Primary Cluster: (Live)*
> Topic: "table" 

Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-07-23 Thread Development
Bump

> On Jul 22, 2019, at 11:26 AM, Development  wrote:
> 
> Hey Matthias,
> 
> It looks a little confusing, but I don’t have enough expertise to judge on 
> the configuration placement.
> 
> If you think, it is fine I’ll go ahead with this approach.
> 
> Best,
> Daniyar Yeralin
> 
>> On Jul 19, 2019, at 5:49 PM, Matthias J. Sax  wrote:
>> 
>> Good point.
>> 
>> I guess the simplest solution is, to actually add
>> 
 default.list.key/value.serde.type
 default.list.key/value.serde.inner
>> 
>> to both `CommonClientConfigs` and `StreamsConfig`.
>> 
>> It's not super clean, but I think it's the best we can do. Thoughts?
>> 
>> 
>> -Matthias
>> 
>> On 7/19/19 1:23 PM, Development wrote:
>>> Hi Matthias,
>>> 
>>> I agree, ConsumerConfig did not seem like a right place for these 
>>> configurations.
>>> I’ll put them in ProducerConfig, ConsumerConfig, and StreamsConfig.
>>> 
>>> However, I have a question. What should I do in "configure(Map 
>>> configs, boolean isKey)” methods? Which configurations should I try to 
>>> locate? I was comparing my (de)serializer implementations with 
>>> SessionWindows(De)serializer classes, and they use StreamsConfig class to 
>>> get  either StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : 
>>> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
>>> 
>>> In my case, as I mentioned earlier, StreamsConfig class is not accessible 
>>> from org.apache.kafka.common.serialization package. So, I can’t utilize it. 
>>> Any suggestions here?
>>> 
>>> Best,
>>> Daniyar Yeralin
>>> 
>>> 
 On Jul 18, 2019, at 8:46 PM, Matthias J. Sax  wrote:
 
 Thanks!
 
 One minor question about the configs. The KIP adds three classes, a
 Serializer, a Deserializer, and a Serde.
 
 Hence, would it make sense to add the corresponding configs to
 `ConsumerConfig`, `ProducerConfig`, and `StreamsConfig` using slightly
 different names each time?
 
 
 Somethin like this:
 
 ProducerConfig:
 
 list.key/value.serializer.type
 list.key/value.serializer.inner
 
 ConsumerConfig:
 
 list.key/value.deserializer.type
 list.key/value.deserializer.inner
 
 StreamsConfig:
 
 default.list.key/value.serde.type
 default.list.key/value.serde.inner
 
 
 Adding `d.l.k/v.serde.t/i` to `CommonClientConfigs does not sound right
 to me. Also note, that it seems better to avoid the `default.` prefix
 for consumers and producers because there is only one Serializer or
 Deserializer anyway. Only for Streams, there are multiple and
 StreamsConfig specifies the default one of an operator does not
 overwrite it.
 
 Thoughts?
 
 
 Also, the KIP should explicitly mention to what classed certain configs
 are added. Atm, the KIP only list parameter names, but does not state
 where those are added.
 
 
 -Matthias
 
 
 
 
 
 On 7/16/19 1:11 PM, Development wrote:
> Hi,
> 
> Yes, totally forgot about the statement. KIP-466 is updated.
> 
> Thank you so much John Roesler, Matthias J. Sax, Sophie Blee-Goldman for 
> your valuable input!
> 
> I hope I did not cause too much trouble :)
> 
> I’ll start the vote now.
> 
> Best,
> Daniyar Yeralin
> 
>> On Jul 16, 2019, at 3:17 PM, John Roesler  wrote:
>> 
>> Hi Daniyar,
>> 
>> Thanks for that update. I took a look, and I think this is in good shape.
>> 
>> One note, the statement "New method public static  Serde>
>> ListSerde() in org.apache.kafka.common.serialization.Serdes class
>> (infers list implementation and inner serde from config file)" is
>> still present in the KIP, although I do it is was removed from the PR.
>> 
>> Once you remove that statement from the KIP, then I think this KIP is
>> ready to go up for a vote! Then, we can really review the PR in
>> earnest and get this thing merged.
>> 
>> Thanks,
>> -john
>> 
>> On Tue, Jul 16, 2019 at 2:05 PM Development  wrote:
>>> 
>>> Hi,
>>> 
>>> Pushed new changes under my PR: 
>>> https://github.com/apache/kafka/pull/6592 
>>> 
>>> 
>>> Feel free to put any comments in there.
>>> 
>>> Best,
>>> Daniyar Yeralin
>>> 
 On Jul 15, 2019, at 1:06 PM, Development  wrote:
 
 Hi John,
 
 I knew I was missing something. Yes, that makes sense now, I removed 
 all `listSerde()` methods, and left empty constructors instead.
 
 As per `CommonClientConfigs` I looked at the class, it doesn’t have 
 any properties related to serdes, and that bothers me a little.
 
 All properties like `default.key.serde` `default.windowed.key.serde.*` 
 are located in StreamsConfig. I don’t want to create a confusion.
 What also 

Re: JIRA and KIP contributor permissions

2019-07-23 Thread Matthias J. Sax
Done.

On 7/23/19 4:56 AM, Alexandre Dupriez wrote:
> Hello Matthias,
> 
> Thanks for the quick reply, I can confirm I am able to auto-assign JIRA
> tickets.
> 
> Please find here my Confluence username: alexandre.dupriez
> 
> Many thanks,
> Alexandre
> 
> Le mar. 23 juil. 2019 à 04:38, Matthias J. Sax  a
> écrit :
> 
>> Hi Alexandre,
>>
>> I added you to the list of contributors in JIRA, so you can self-assign
>> ticket. However, I did not find any corresponding wiki. Note, that both
>> are independent accounts and you might need to create a wiki account
>> first (and share you ID so we can grant write permission).
>>
>>
>> -Matthias
>>
>> On 7/22/19 1:16 PM, Alexandre Dupriez wrote:
>>> Hello Community,
>>>
>>> In order to start contributing to Apache Kafka project, could I please
>>> request contributor access to JIRA and be granted write permissions to
>> the
>>> Kafka wiki?
>>>
>>> JIRA username: adupriez
>>> Committer email: alexandre.dupr...@amazon.com 
>>>
>>> Thank you in advance,
>>> Alexandre
>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (KAFKA-8700) Flaky Test QueryableStateIntegrationTest#queryOnRebalance

2019-07-23 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8700:
--

 Summary: Flaky Test QueryableStateIntegrationTest#queryOnRebalance
 Key: KAFKA-8700
 URL: https://issues.apache.org/jira/browse/KAFKA-8700
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Affects Versions: 2.4.0
Reporter: Matthias J. Sax
 Fix For: 2.4.0


[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3807/tests]
{quote}java.lang.AssertionError: Condition not met within timeout 12. 
waiting for metadata, store and value to be non null
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)
at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.verifyAllKVKeys(QueryableStateIntegrationTest.java:292)
at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.queryOnRebalance(QueryableStateIntegrationTest.java:382){quote}
 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8526) Broker may select a failed dir for new replica even in the presence of other live dirs

2019-07-23 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8526.

   Resolution: Fixed
Fix Version/s: 2.4.0

> Broker may select a failed dir for new replica even in the presence of other 
> live dirs
> --
>
> Key: KAFKA-8526
> URL: https://issues.apache.org/jira/browse/KAFKA-8526
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.1, 2.1.1, 2.3.0, 2.2.1
>Reporter: Anna Povzner
>Assignee: Igor Soarez
>Priority: Major
> Fix For: 2.4.0
>
>
> Suppose a broker is configured with multiple log dirs. One of the log dirs 
> fails, but there is no load on that dir, so the broker does not know about 
> the failure yet, _i.e._, the failed dir is still in LogManager#_liveLogDirs. 
> Suppose a new topic gets created, and the controller chooses the broker with 
> failed log dir to host one of the replicas. The broker gets LeaderAndIsr 
> request with isNew flag set. LogManager#getOrCreateLog() selects a log dir 
> for the new replica from _liveLogDirs, then one two things can happen:
> 1) getAbsolutePath can fail, in which case getOrCreateLog will throw an 
> IOException
> 2) Creating directory for new the replica log may fail (_e.g._, if directory 
> becomes read-only, so getAbsolutePath worked). 
> In both cases, the selected dir will be marked offline (which is correct). 
> However, LeaderAndIsr will return an error and replica will be marked 
> offline, even though the broker may have other live dirs. 
> *Proposed solution*: Broker should retry selecting a dir for the new replica, 
> if initially selected dir threw an IOException when trying to create a 
> directory for the new replica. We should be able to do that in 
> LogManager#getOrCreateLog() method, but keep in mind that 
> logDirFailureChannel.maybeAddOfflineLogDir does not synchronously removes the 
> dir from _liveLogDirs. So, it makes sense to select initial dir by calling 
> LogManager#nextLogDir (current implementation), but if we fail to create log 
> on that dir, one approach is to select next dir from _liveLogDirs in 
> round-robin fashion (until we get to initial log dir – the case where all 
> dirs failed).



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Adam Bellemare
Hi Ryanne

I think we have inconsistent definitions of Active-Active. The producer is
only producing to one cluster (primary) and one topic (topic "table"), and
the other cluster (secondary) contains only a replication of the data via
MM2 ("primary.table"). What you seemed to be proposing is that the
producer's "table" data is sent fully to each cluster, such that the state
can be materialized as a KTable in each application running on each
cluster. This wouldn't require MM2 at all, so I'm not sure if this is what
you advocated.

You also state that "As with normal consumers, the Streams app should
*subscribe
to any remote topics*, e.g. with a regex, s.t. the application state will
reflect input from either source cluster.". Wouldn't this mean that the
stateful "table" topic that we wish to materialize would be replicated by
MM2 from Primary, such that we end up with the following:

*Replicated Entity/Stateful Data:*
*Primary Cluster: (Live)*
Topic: "table" (contains data from T = 0 to T = n)
SteamsAppPrimary is consuming from ("table")

*Secondary Cluster: (Live)*
Topic: "primary.table" (contains data from T = 0 to T = n)
SteamsAppSecondary is consuming from ("primary.table")

What does StreamsAppSecondary do when "primary.table" is no longer
replicated because Primary has died? Additionally, where should the
producer of topic "table" now write its data to, assuming that Primary
Cluster is irrevocably lost?

I hope this better outlines my scenario. The trivial solution seems to be
to make your producers produce all stateful data (topic "table") to each
cluster, which makes MM2 unnecessary, but can also lead to data
inconsistencies so it's not exactly foolproof.

Thanks

On Mon, Jul 22, 2019 at 6:32 PM Ryanne Dolan  wrote:

> Hello Adam, thanks for the questions. Yes my organization uses Streams,
> and yes you can use Streams with MM2/KIP-382, though perhaps not in the way
> you are describing.
>
> The architecture you mention is more "active/standby" than "active/active"
> IMO. The "secondary" cluster is not being used until a failure, at which
> point you migrate your app and expect the data to already be there. This
> works for normal consumers where you can seek() and --reset-offsets.
> Streams apps can be reset with the kafka-streams-application-reset tool,
> but as you point out, that doesn't help with rebuilding an app's internal
> state, which would be missing on the secondary cluster. (Granted, that may
> be okay depending on your particular application.)
>
> A true "active/active" solution IMO would be to run your same Streams app
> in _both_ clusters (primary, secondary), s.t. the entire application state
> is available and continuously updated in both clusters. As with normal
> consumers, the Streams app should subscribe to any remote topics, e.g. with
> a regex, s.t. the application state will reflect input from either source
> cluster.
>
> This is essentially what Streams' "standby replicas" are -- extra copies
> of application state to support quicker failover. Without these replicas,
> Streams would need to start back at offset 0 and re-process everything in
> order to rebuild state (which you don't want to do during a disaster,
> especially!). The same logic applies to using Streams with MM2. You _could_
> failover by resetting the app and rebuilding all the missing state, or you
> could have a copy of everything sitting there ready when you need it. The
> easiest way to do the latter is to run your app in both clusters.
>
> Hope that helps.
>
> Ryanne
>
> On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare 
> wrote:
>
>> Hi Ryanne
>>
>> I have a quick question for you about Active+Active replication and Kafka
>> Streams. First, does your org /do you use Kafka Streams? If not then I
>> think this conversation can end here. ;)
>>
>> Secondly, and for the broader Kafka Dev group - what happens if I want to
>> use Active+Active replication with my Kafka Streams app, say, to
>> materialize a simple KTable? Based on my understanding, I topic "table" on
>> the primary cluster will be replicated to the secondary cluster as
>> "primary.table". In the case of a full cluster failure for primary, the
>> producer to topic "table" on the primary switches over to the secondary
>> cluster, creates its own "table" topic and continues to write to there. So
>> now, assuming we have had no data loss, we end up with:
>>
>>
>> *Primary Cluster: (Dead)*
>>
>>
>> *Secondary Cluster: (Live)*
>> Topic: "primary.table" (contains data from T = 0 to T = n)
>> Topic: "table" (contains data from T = n+1 to now)
>>
>> If I want to materialize state from using Kafka Streams, obviously I am
>> now in a bit of a pickle since I need to consume "primary.table" before I
>> consume "table". Have you encountered rebuilding state in Kafka Streams
>> using Active-Active? For non-Kafka Streams I can see using a single
>> consumer for "primary.table" and one for "table", interleaving the
>> timestamps and performing basic event 

Kafka consumer is not reading some partition

2019-07-23 Thread Sergey Fedorov
Hello. I was using Kafka 2.1.1 and facing a problem where our consumers
sometimes intermittently stop consuming from one or two of the partitions. My
config


[jira] [Created] (KAFKA-8699) rack aware replica, found rack with twi replicas

2019-07-23 Thread abdessamad (JIRA)
abdessamad created KAFKA-8699:
-

 Summary: rack aware replica, found rack with twi replicas
 Key: KAFKA-8699
 URL: https://issues.apache.org/jira/browse/KAFKA-8699
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.0.0
Reporter: abdessamad


Hi everyone,

Currently we run KAFKA in production with 6 racks, topic are created with 
replica factor of 3, the rack aware replica assignment is set properly but we 
encouter an issue when we check the location with replicas,

rack1 -> broker(1,2)

rack2 -> broker(3)

rack3 -> broker(4)

rack4 -> broker(5)

we have some topics with :

topicA -> partition 0  -> broker (1,2,3) not expected

               partition 1  -> broker (3,5,5)

               partition 2  -> broker (5,3,2)

               partition 3  -> broker (5,2,1) not expected

is location true ? if not do you have any idea why this issue happen and how we 
can fix it.

 

Many thanks, any help would be greatly appreciated.

 

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8698) ListOffsets Response protocol documentation

2019-07-23 Thread JIRA
Fábio Silva created KAFKA-8698:
--

 Summary: ListOffsets Response protocol documentation
 Key: KAFKA-8698
 URL: https://issues.apache.org/jira/browse/KAFKA-8698
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Fábio Silva


The documentation of ListOffsets Response (Version: 0) appears to have an error 
on offsets field name, suffixed with `'`.
{code:java}
[offsets']{code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: JIRA and KIP contributor permissions

2019-07-23 Thread Alexandre Dupriez
Hello Matthias,

Thanks for the quick reply, I can confirm I am able to auto-assign JIRA
tickets.

Please find here my Confluence username: alexandre.dupriez

Many thanks,
Alexandre

Le mar. 23 juil. 2019 à 04:38, Matthias J. Sax  a
écrit :

> Hi Alexandre,
>
> I added you to the list of contributors in JIRA, so you can self-assign
> ticket. However, I did not find any corresponding wiki. Note, that both
> are independent accounts and you might need to create a wiki account
> first (and share you ID so we can grant write permission).
>
>
> -Matthias
>
> On 7/22/19 1:16 PM, Alexandre Dupriez wrote:
> > Hello Community,
> >
> > In order to start contributing to Apache Kafka project, could I please
> > request contributor access to JIRA and be granted write permissions to
> the
> > Kafka wiki?
> >
> > JIRA username: adupriez
> > Committer email: alexandre.dupr...@amazon.com 
> >
> > Thank you in advance,
> > Alexandre
> >
>
>


Re: [DISCUSS] KIP-490: log when consumer groups lose a message because offset has been deleted

2019-07-23 Thread Kamal Chandraprakash
Jose,

How do you differentiate the compaction topics from the time retention
topics? Deleting a message due to compaction policy is a valid case
and users won't be interested in monitoring/reading those deleted messages.

Thanks,
Kamal

On Tue, Jul 23, 2019 at 4:00 AM Jose M  wrote:

> Hi Colin,
>
> Thanks a lot for your feedback. Please note that I only propose to log when
> a message is lost this for a set of consumer groups, not as default
> behaviour for all consumer groups.
> But in fact, I agree with you that to log a line per message expired can be
> quite lot, and that is not the better way do it. I can propose to add a
> dedicated JMX metric of type counter "expired messages" per consumer group.
> What do you think ?
>
> About monitoring the lag to ensure that messages are not lost, I know that
> is what clients do, to set up alerting when the lag is above a threshold.
> But even if the alert is triggered, we dont know if messages have been lost
> or not. Implementing this KIP clients would know if something has been
> missed or not.
>
>
> Thanks,
>
>
> Jose
>
> On Mon, Jul 22, 2019 at 5:51 PM Colin McCabe  wrote:
>
> > Hi Jose,
> >
> > One issue that I see here is that the number of log messages could be
> > huge.  I've seen people create tens of thousands of consumer groups.
> > People can also have settings that create pretty small log files.  A
> > message per log file per group could be quite a lot of messages.
> >
> > A log message on the broker is also not that useful for detecting bad
> > client behavior.  People generally only look at the server logs after
> they
> > become aware that something is wrong through some other means.
> >
> > Perhaps the clients should just monitor their lag?  There is a JMX metric
> > for this, which means it can be hooked into traditional metrics /
> reporting
> > systems.
> >
> > best,
> > Colin
> >
> >
> > On Mon, Jul 22, 2019, at 03:12, Jose M wrote:
> > > Hello,
> > >
> > > I didn't get any feedback on this small KIP-490
> > > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-490%3A+log+when+consumer+groups+lose+a+message+because+offset+has+been+deleted
> > >.
> > > In summary, I propose a way to be noticed when messages are being
> > > removed
> > > due to retention policy, without being consumed by a given consumer
> > > group.
> > > It will be useful to realize that some important messages have been
> > > lost.
> > >
> > > As Im new to the codebase, I have technical questions about how to
> > achieve
> > > this, but before going deeper, I would like your feedback on the
> feature.
> > >
> > > Thanks a lot,
> > >
> > >
> > > Jose Morales
> > >
> > > On Sun, Jul 14, 2019 at 12:51 AM Jose M  wrote:
> > >
> > > > Hello,
> > > >
> > > > I would like to know what do you think on KIP-490:
> > > >
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-490%3A+log+when+consumer+groups+lose+a+message+because+offset+has+been+deleted
> > > > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-490%3A+log+when+consumer+groups+lose+a+message+because+offset+has+expired
> > >
> > > >
> > > >
> > > > Thanks a lot !
> > > > --
> > > > Jose M
> > > >
> > >
> > >
> > > --
> > > J
> > >
> >
>
>
> --
> J
>


KAFKA-1194: The Kafka Broker can't delete old log files on Windows

2019-07-23 Thread Omar Al-Safi
Hello community,

As I can see this issue has been long here:
https://issues.apache.org/jira/browse/KAFKA-1194 which is about the Kafka
logs on Windows which can't be deleted. Also, I see there is on going PR
here for it: https://github.com/apache/kafka/pull/6329, but not much going
there after Colin's comment. Therefore, do we have a plan to fix this issue
in the future releases or if it is in the roadmap somehow? I'd like to know
in case we can help from our side to get this issue resolved.

Thanks,
Omar Al-Safi


[jira] [Created] (KAFKA-8697) Kafka consumer group auto removal

2019-07-23 Thread Pavel Rogovoy (JIRA)
Pavel Rogovoy created KAFKA-8697:


 Summary: Kafka consumer group auto removal
 Key: KAFKA-8697
 URL: https://issues.apache.org/jira/browse/KAFKA-8697
 Project: Kafka
  Issue Type: Improvement
Reporter: Pavel Rogovoy


Hello everyone,

I'm new to Kafka so please be gentle with me :)

Current issue:

Lets say I have a consumer that consumes messages from a consumer group named 
'ABC' and decides to terminate. Consumer group 'ABC' will now stay there 
hanging with zero consumers. This situation will cause monitoring tools like 
burrow to alert on a lag for this consumer group even though my application has 
finished its job and doesn't want to do anything and thus not in lag. 

 

I think it will be useful if we will add an option to create a consumer group 
that will be automatically removed when the last consumer has terminated 
properly and did not crashed.

 

Please tell me what do you think?



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8380) We can not create a topic, immediately write to it and then read.

2019-07-23 Thread Narendra Kumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Narendra Kumar resolved KAFKA-8380.
---
Resolution: Not A Problem

> We can not create a topic, immediately write to it and then read.
> -
>
> Key: KAFKA-8380
> URL: https://issues.apache.org/jira/browse/KAFKA-8380
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Darya Merkureva
>Priority: Blocker
>
> We are trying to create a topic, immediately write to it and read. 
> For some reason, we read nothing in spite of the fact that we are waiting for 
> the completion of KafkaFuture. 
> {code:java}
> public class main {
>   private static final String TOPIC_NAME = "topic";
>   private static final String KEY_NAME = "key";
>   public static void main(String[] args) {
>   final Properties prodProps = new Properties();
>   prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
>   prodProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
>   prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
>   prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>   prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>   final Producer prod = new 
> KafkaProducer<>(prodProps);
>   final Properties admProps = new Properties();
>   admProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
>   final AdminClient adm = KafkaAdminClient.create(admProps);
>   final Properties consProps = new Properties();
>   consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
>   consProps.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
>   consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
>   consProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
> "1000");
>   consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
> "3");
>   consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
>   consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringDeserializer");
>   final Consumer cons = new 
> KafkaConsumer<>(consProps);
>   
>   try {
>   final NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, 
> (short)1);
>   val createTopicsResult = 
> adm.createTopics(Collections.singleton(newTopic));
>   createTopicsResult.values().get(TOPIC_NAME).get();
>   } catch (InterruptedException | ExecutionException e) {
>   if (!(e.getCause() instanceof TopicExistsException)) {
>   throw new RuntimeException(e.getMessage(), e);
>   }
>   }
>   
>   final ProducerRecord producerRecord =
>   new ProducerRecord<>(TOPIC_NAME, KEY_NAME, 
> "data");
>   prod.send(producerRecord);
>   prod.send(producerRecord);
>   prod.send(producerRecord);
>   prod.send(producerRecord);
>   cons.subscribe(Arrays.asList(TOPIC_NAME));
>   val records  = cons.poll(Duration.ofSeconds(10));
>   for(var record: records){
>   System.out.println(record.value());
>   }
>   }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Build failed in Jenkins: kafka-2.3-jdk8 #77

2019-07-23 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-8678; Fix leave group protocol bug in throttling and error

--
[...truncated 2.73 MB...]

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToOnlineReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToOnlineReplicaTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testInvalidOnlineReplicaToNewReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidOnlineReplicaToNewReplicaTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testReplicaDeletionStartedToReplicaDeletionIneligibleTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testReplicaDeletionStartedToReplicaDeletionIneligibleTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testInvalidReplicaDeletionSuccessfulToOfflineReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidReplicaDeletionSuccessfulToOfflineReplicaTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testInvalidReplicaDeletionIneligibleToNewReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidReplicaDeletionIneligibleToNewReplicaTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testReplicaDeletionIneligibleToOnlineReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testReplicaDeletionIneligibleToOnlineReplicaTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testInvalidReplicaDeletionStartedToNewReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidReplicaDeletionStartedToNewReplicaTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToReplicaDeletionSuccessfulTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testInvalidNonexistentReplicaToReplicaDeletionSuccessfulTransition PASSED

kafka.controller.ReplicaStateMachineTest > 
testNewReplicaToOnlineReplicaTransition STARTED

kafka.controller.ReplicaStateMachineTest > 
testNewReplicaToOnlineReplicaTransition PASSED

kafka.tools.MirrorMakerTest > 
testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage STARTED

kafka.tools.MirrorMakerTest > 
testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandlerWithHeaders 
STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandlerWithHeaders 
PASSED

kafka.tools.MirrorMakerIntegrationTest > testCommaSeparatedRegex STARTED

kafka.tools.MirrorMakerIntegrationTest > testCommaSeparatedRegex PASSED

kafka.tools.MirrorMakerIntegrationTest > 
testCommitOffsetsRemoveNonExistentTopics STARTED

kafka.tools.MirrorMakerIntegrationTest > 
testCommitOffsetsRemoveNonExistentTopics PASSED

kafka.tools.MirrorMakerIntegrationTest > testCommitOffsetsThrowTimeoutException 
STARTED

kafka.tools.MirrorMakerIntegrationTest > testCommitOffsetsThrowTimeoutException 
PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseGroupIdFromBeginningGivenTogether 
STARTED

kafka.tools.ConsoleConsumerTest > shouldParseGroupIdFromBeginningGivenTogether 
PASSED

kafka.tools.ConsoleConsumerTest > shouldExitOnOffsetWithoutPartition STARTED

kafka.tools.ConsoleConsumerTest > shouldExitOnOffsetWithoutPartition PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails PASSED

kafka.tools.ConsoleConsumerTest > 
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning STARTED

kafka.tools.ConsoleConsumerTest > 
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginning PASSED

kafka.tools.ConsoleConsumerTest > shouldResetUnConsumedOffsetsBeforeExit STARTED

kafka.tools.ConsoleConsumerTest > shouldResetUnConsumedOffsetsBeforeExit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile STARTED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidConsumerConfigWithAutoOffsetResetLatest STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidConsumerConfigWithAutoOffsetResetLatest PASSED

kafka.tools.ConsoleConsumerTest > groupIdsProvidedInDifferentPlacesMustMatch 
STARTED

kafka.tools.ConsoleConsumerTest > groupIdsProvidedInDifferentPlacesMustMatch 
PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginning 
STARTED

kafka.tools.ConsoleConsumerTest > 

[DISCUSS] KIP-495: Dynamically Adjust Log Levels in Connect

2019-07-23 Thread Arjun Satish
Hi everyone.

I'd like to propose the following KIP to implement changing log levels on
the fly in Connect workers:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-495%3A+Dynamically+Adjust+Log+Levels+in+Connect

Would like to hear your thoughts on this.

Thanks very much,
Arjun