[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-06-10 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132887#comment-17132887
 ] 

Randall Hauch commented on KAFKA-9216:
--

Thanks, [~ChrisEgerton]. I think we came to consensus on the PR by improving 
the error message with better instructions.

> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Evelyn Bayes
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-06-08 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17128492#comment-17128492
 ] 

Chris Egerton commented on KAFKA-9216:
--

Based on [this 
change|https://github.com/apache/kafka/pull/8270/files#diff-1c045c8737aea4c820319a6de65af8a4R275]
 it looks like we've altered workers to fail on startup if the config topic has 
multiple partitions. How should users respond in this case? As far as I know 
there's no out-of-the-box tool to reduce the number of partitions for a topic.

Do we expect them to just delete the topic and then recreate it? If so, could 
we say that explicitly (or even just recommend that they delete the topic and 
then allow the framework to create it automatically, ACLs permitting)? If not, 
can we outline a recommended flow here?

I love the idea here and it should save users plenty of headaches in the 
future, but I think we might want to consider the impact on users who may see 
this during an upgrade and try to outline clear steps for them to take, either 
directly in the error message or on this ticket (which can be linked to in the 
error message).

> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Evelyn Bayes
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-06-07 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17127789#comment-17127789
 ] 

Randall Hauch commented on KAFKA-9216:
--

[~kkonstantine], thanks for reviewing and merging [~EeveeB]'s PR to check the 
partition count of the connector configs topic. I've created a followup PR 
(https://github.com/apache/kafka/pull/8828) that verifies that each internal 
topic has the `cleanup.policy=compact`; the worker fails to start if the 
cleanup policy is any other value (e.g., `delete`, `delete,compact` or 
`compact,delete`). The logic will avoid the check if the worker created the 
topic, since the topic is always created with `cleanup.policy=compact`.

Note that I chose to avoid checking retention, since that only applies if the 
`delete` cleanup policy is used. Therefore, the check described in the 
preceding paragraph should be sufficient. 

> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Evelyn Bayes
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-05-19 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17111394#comment-17111394
 ] 

Greg Harris commented on KAFKA-9216:


I believe that KAFKA-5087 has an intent similar to this issue, but was never 
implemented, and erroneously closed as a duplicate.

It does call out the need to enforce that the topics are log-compacted rather 
than time- or size-deleted in order to prevent loss of state, which I think 
would be a valuable addition to the validation suggested here.

One additional aspect that isn't covered yet is that the retention period of 
the topics should also be infinite, since loss of state due to long-running 
connectors is undesirable.

I think that any one of these issues (config partition count != 1, cleanup 
policy != compact, or retention != infinite) should generate an exception 
during startup, to prevent the worker from operating on top of a lossy 
state-store.

> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Evelyn Bayes
>Priority: Major
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-03-10 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17056572#comment-17056572
 ] 

ASF GitHub Bot commented on KAFKA-9216:
---

Evelyn-Bayes commented on pull request #8270: KAFKA-9216: Enforce connect 
internal topic configuration at startup
URL: https://github.com/apache/kafka/pull/8270
 
 
   Currently, if Kafka Connect will create its config backing topic with a fire 
and forget approach.
   This is fine unless someone has manually created that topic already with the 
wrong partition count.
   
   In such a case Kafka Connect "may" run for some time.
   Especially if it's in standalone mode and once switched to distributed mode 
it will almost certainly fail.
   
   To counter this I've added a check when the KafkaConfigBackingStore is 
starting.
   This check will throw a ConfigException if there is more than one partition 
in the backing store.
   
   This exception is then caught upstream and logged by either:
   - class: DistributedHerder, method: run
   - class: ConnectStandalone, method: main
   
   After a review I don't believe it impacts any other upstream code.
   
   Finally, to supper this new functionality I've added a public method to 
KafkaBasedLog which returns the partition count and a variable to store this.
   
   And, I've created a unit test in KafkaConfigBackingStoreTest to verify the 
behaviour.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Evelyn Bayes
>Priority: Major
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-03-04 Thread Evelyn Bayes (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17051083#comment-17051083
 ] 

Evelyn Bayes commented on KAFKA-9216:
-

[~rhauch] I think your interpretation is a little off.

When Kafka Connect starts it sends a create topic command (with 1 partition) in 
a fire and forget fashion.

It then operates under the assumption the topic is there.

As for your choices, the third option was my favourite too.

> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Evelyn Bayes
>Priority: Major
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-02-26 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045982#comment-17045982
 ] 

Randall Hauch commented on KAFKA-9216:
--

For the record, this does change the behavior but I don't think this will 
require a KIP. Anyone in this situation will almost certainly run into problems 
if they're running a multi-node distributed Connect cluster. They *may* not run 
into this if they're just using a single worker, but even in this situation 
there still is the possibility of a problem, since the configs no longer have a 
total order.

This does not affect standalone mode at all.

 

[~EeveeB] one thing we probably also want to do is make sure the exception 
message contains hopefully concrete information about what the user should do 
to correct the problem. This may be the hardest part. :) 

> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Evelyn Bayes
>Priority: Major
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-02-26 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045979#comment-17045979
 ] 

Randall Hauch commented on KAFKA-9216:
--

Thanks for volunteering to fix this, [~EeveeB], and for identifying three 
potential approaches to fixing this!

Before I discuss your approaches, I do want to first confirm something. If the 
Connect worker creates the config topic, it does so always with a single 
partition. That means that we're only concerned with scenarios where the topic 
was manually created (or modified) before the worker was started. Is that 
correct?

Okay, now to your approaches. Just to clarify, the call path is basically:
 # `DistributedHerder.run()` calls `startServices()`
 # `DistributedHerder.startServices()` calls `configBackingStore.start()`
 # `KafkaConfigBackingStore.start()` calls `configLog.start()`
 # `KafkaBasedLog.start()` calls `initializer.run()`

We can see that the `KafkaBasedLog.start()` method already has code that throws 
a ConnectException, so we know that already stops the herder from running 
(which is the desired behavior). So, as long as the 
`KafkaConfigBackingStore.start()` method (or anything called within it, 
including `initializer.run()`) throws a ConnectException with the appropriate 
error, the herder will stop.
h3. Option 1

This option would work, but it seems to be a fair amount of work compared the 
others. 
h3. Option 2

IIUC, your second option is to modify the *initializer* function defined in 
`KafkaConfigBackingStore` to also get/check the number of partitions and to 
throw a ConnectException if the topic already exists and has more than one 
partition.

This would require modifying the TopicAdmin to get the metadata for the 
existing topic and return it. While that's probably doable, it's more 
complicated than your next option.
h3. Option 3

This is a good idea, too, especially because the `KafkaBasedLog.start()` method 
is already getting the partition information from the consumer in the form of a 
`List` for the one topic (or a bit later, the 
`List` for the topic). If it stored *that* as a local variable 
and return an immutable version of that map via a method, the 
`KafkaConfigBackingStore.start()` method could use this method and fail if 
there is more than 1 partition.

The great thing about this approach is that we don't have to modify the 
`TopicAdmin` utility or the initializer. The changes to `KafkaBasedLog` are 
minimal – we just need the getter method to return an immutable list of 
immutable `TopicPartition` objects. (Note that we could return `PartitionInfo`, 
but it's not immutable and we don't know how our new getter method might be 
used. Returning an immutable `List` is much safer.)

We do have to modify the `KafkaConfigBackingStore.start()` method to use this 
new method, but that would be super simple logic.

*Personally, I think this is a great approach: it's simple and localizes the 
changes pretty well.*
h3. Option 4

A slight variation of Option 3 is to not introduce a new field and getter in 
`KafkaBasedLog` that returns the partition information, but to instead pass a 
"partition validation" function into the `KafkaBasedLog` constructor and then 
to use this in the `start()` method. The benefit is that we don't have to 
expose any new methods on `KafkaBasedLog`, but we have to change the 
constructor.

This really has all the same benefits as option 3, but it's a little more hard 
to follow the logic. So I don't like this quite as much as option 3.

 

 

> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Priority: Major
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-02-19 Thread Evelyn Bayes (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040545#comment-17040545
 ] 

Evelyn Bayes commented on KAFKA-9216:
-

I'd be happy to do this.

 

Looking through the code, the worker tries to recreated the log each time a 
worker starts:

 
{code:java}
public KafkaConfigBackingStore(Converter converter, WorkerConfig config, 
WorkerConfigTransformer configTransformer) {
  this.lock = new Object();
  this.started = false;
  this.converter = converter;
  this.offset = -1;
  this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
  if (this.topic == null || this.topic.trim().length() == 0)
throw new ConfigException("Must specify topic for connector 
configuration.");
  configLog = setupAndCreateKafkaBasedLog(this.topic, config);  <- here
  this.configTransformer = configTransformer;
}
{code}
The above only does prep on the worker end and it isn't until start() is called 
that it actually tries to "create" the topic:

 
{code:java}
public KafkaBasedLog(String topic,
 Map producerConfigs,
 Map consumerConfigs,
 Callback> consumedCallback,
 Time time,
 Runnable initializer) {
this.topic = topic;
this.producerConfigs = producerConfigs;
this.consumerConfigs = consumerConfigs;
this.consumedCallback = consumedCallback;
this.stopRequested = false;
this.readLogEndOffsetCallbacks = new ArrayDeque<>();
this.time = time;
this.initializer = initializer != null ? initializer : new Runnable() {
@Override
public void run() {
}
};
}

public void start() {<- here
log.info("Starting KafkaBasedLog with topic " + topic);

initializer.run();   <- here
producer = createProducer();
consumer = createConsumer();

{code}
And the worker just fires and forgets:
{code:java}
private KafkaBasedLog createKafkaBasedLog(String topic, 
Map producerProps,
  Map consumerProps,
  
Callback> consumedCallback,
  final NewTopic 
topicDescription, final Map adminProps) {
Runnable createTopics = new Runnable() {
@Override
public void run() {
log.debug("Creating admin client to manage Connect internal 
config topic");
try (TopicAdmin admin = new TopicAdmin(adminProps)) {
admin.createTopics(topicDescription);
}
}
};
return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, Time.SYSTEM, createTopics);
}
{code}
This means that it doesn't seem to expose the number of partitions anywhere, 
except in the utility class TopicAdmin which is used by the offset backing 
store and the config backing store.

 

I see three options:
 * Create a new temporary clients and get the metadata to do a check as part of 
the starting process in KafkaConfigBackingStore;
 * Modify TopicAdmin which is a utility client used to create the topic for 
KafkaConfigBackingStore and KafkaOffsetBackingStore but this doesn't seem to 
get that metadata at the moment. However, we could get it to return details on 
the topic; or
 * Modify KafkaBasedLog with a new method to expose the partition count and 
create a private variable to retain that information.



For the second and third option we'd pass the partition information to 
KafkaConfigBackingStore and run the check there

 

 

 

 

> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Priority: Major
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker