Kafka Connect Worker Provisioning/Optimization Considerations
I have 100+ sink connectors running with 100+ topics each with roughly 3 partitions per topic. How would you configure resources (mem and cpu) to optimally handle this level of load. What would be your considerations? Also, When considering this load, should i be thinking about it as an aggregated value (1 node at 1cpu and 4g = 10cpu and 40g if i have 10 workers). Thank you kindly, -BW
Re: Kafka Connect Limits
Thank you for having a look at this. I agree that the only way to really gauge load is to look at lag. But the connector tasks should not crash and die because of load. I will raise this with SF. On Wed, Jul 3, 2024 at 7:14 PM Greg Harris wrote: > Hey Burton, > > Thanks for your question and bug report. > > The exception you included does not indicate that your connectors are > overloaded. The primary way of diagnosing an overloaded connector is the > consumer lag metric, and if you're seeing acceptable lag, that should > indicate that your connectors are capable of handling the load. > I would say that your workload is _unhealthy_ though, because it should be > able to operate without throwing this particular exception. I found one > previous report of this exception [1] but no action was taken. Instead, the > recommendation was to change the task implementation to perform offset > loading only during open(). > > It looks like this depends on the task implementation: If the consumer > rebalances and the task loses its assignment, and the task later calls > SinkTaskContext#offset() with the now-revoked partition, it would cause > this exception. > I'm not familiar with the Snowflake task, but upon a cursory inspection, it > looks like it buffers records [2] across poll() calls, and may call > SinkTaskContext#offset() in a later poll [3]. They appear to have a close() > method that could be used to prevent SinkTaskContext#offset() from being > called, but I'm not sure why it isn't effective. > You should contact the Snowflake Connector maintainers and know that they > are exposed to this exception. > > I'll re-open this issue on the framework side to see if we can find a > solution to fix this for other connectors. > > Thanks, > Greg > > [1] https://issues.apache.org/jira/browse/KAFKA-10370 > [2] > > https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L380-L383 > [3] > > https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L908 > [4] > > https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java#L442 > > On Wed, Jul 3, 2024 at 12:25 PM Burton Williams < > burton.b.willi...@gmail.com> > wrote: > > > Hi, > > > > I have 100+ sink connectors running with 100+ topics each with roughly 3 > > partitions per topic. There are running on K8s on 10 pods with 6 cpus and > > 32Gig mem. The connector in question is Snowflake's sink connector > v2.2.0. > > This worked in the mini batch mode SNOWPIPE, but once i switched over to > > SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the > > exception: > > > > State: FAILED > > > > Worker ID: 10.136.83.73:8080 > > > > Trace: java.lang.IllegalStateException: No current assignment for > > partition bigpicture.bulk_change-0 > > at > > > > > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) > > > > at > > > > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386) > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637) > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642) > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327) > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238) > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207) > > > > at > > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229) > > > > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284) > > > > at > > > > > org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) > > > > at > > > > > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > > > > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > > > > at > > > > > java.ba
Re: Kafka Connect Limits
Hey Burton, Thanks for your question and bug report. The exception you included does not indicate that your connectors are overloaded. The primary way of diagnosing an overloaded connector is the consumer lag metric, and if you're seeing acceptable lag, that should indicate that your connectors are capable of handling the load. I would say that your workload is _unhealthy_ though, because it should be able to operate without throwing this particular exception. I found one previous report of this exception [1] but no action was taken. Instead, the recommendation was to change the task implementation to perform offset loading only during open(). It looks like this depends on the task implementation: If the consumer rebalances and the task loses its assignment, and the task later calls SinkTaskContext#offset() with the now-revoked partition, it would cause this exception. I'm not familiar with the Snowflake task, but upon a cursory inspection, it looks like it buffers records [2] across poll() calls, and may call SinkTaskContext#offset() in a later poll [3]. They appear to have a close() method that could be used to prevent SinkTaskContext#offset() from being called, but I'm not sure why it isn't effective. You should contact the Snowflake Connector maintainers and know that they are exposed to this exception. I'll re-open this issue on the framework side to see if we can find a solution to fix this for other connectors. Thanks, Greg [1] https://issues.apache.org/jira/browse/KAFKA-10370 [2] https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L380-L383 [3] https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L908 [4] https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java#L442 On Wed, Jul 3, 2024 at 12:25 PM Burton Williams wrote: > Hi, > > I have 100+ sink connectors running with 100+ topics each with roughly 3 > partitions per topic. There are running on K8s on 10 pods with 6 cpus and > 32Gig mem. The connector in question is Snowflake's sink connector v2.2.0. > This worked in the mini batch mode SNOWPIPE, but once i switched over to > SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the > exception: > > State: FAILED > > Worker ID: 10.136.83.73:8080 > > Trace: java.lang.IllegalStateException: No current assignment for > partition bigpicture.bulk_change-0 > at > > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) > > at > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207) > > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229) > > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284) > > at > > org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) > > at > > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > > at > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > at > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > at java.base/java.lang.Thread.run(Thread.java:829) > > > > My questions are: > >1. Are these connectors are overloaded? Can kafka connect handle this >level of load? >2. say it can, which i've seen it do, could it be that this is caused by >underlying rebalancing? If so what would you recommend I do to mitigate? > > > Thanks > > -BW >
Kafka Connect Limits
Hi, I have 100+ sink connectors running with 100+ topics each with roughly 3 partitions per topic. There are running on K8s on 10 pods with 6 cpus and 32Gig mem. The connector in question is Snowflake's sink connector v2.2.0. This worked in the mini batch mode SNOWPIPE, but once i switched over to SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the exception: State: FAILED Worker ID: 10.136.83.73:8080 Trace: java.lang.IllegalStateException: No current assignment for partition bigpicture.bulk_change-0 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) My questions are: 1. Are these connectors are overloaded? Can kafka connect handle this level of load? 2. say it can, which i've seen it do, could it be that this is caused by underlying rebalancing? If so what would you recommend I do to mitigate? Thanks -BW
Re: Regarding Kafka connect task to partition relationship for both source and sink connectors
For sink connectors, I believe you can scale up the tasks to match the partitions on the topic. But I don't believe this is the case for source connectors; the number of partitions on the topic you're producing to has nothing to do with the number of connector tasks. It really depends on the individual source connector and if the source data-type could benefit from multiple tasks. For example, the JDBC source connector (a very popular connector) only supports 1 task - even if you're querying multiple tables. Bottom line: you'll need to check the documentation for the connector in question to see if it supports multiple tasks. Alex On Thu, May 30, 2024 at 7:51 AM Sébastien Rebecchi wrote: > Hello > > Confirmed. Partition is the minimal granularity level, so having more > consumers than the number of partitions of a topic for a same consumer > group is useless, having P partitions means maximum parallelism is reached > using P consumers. > > Regards, > > Sébastien. > > Le jeu. 30 mai 2024 à 14:43, Yeikel Santana a écrit : > > > Hi everyone, > > > > > > From my understanding, if a topic has n partitions, we can create up to > n > > tasks for both the source and sink connectors to achieve the maximum > > parallelism. Adding more tasks would not be beneficial, as they would > > remain idle and be limited to the number of partitions of the topic > > > > > > Could you please confirm if this understanding is correct? > > > > > > If this understanding is incorrect could you please explain the > > relationship if any? > > > > > > Thank you! > > > > > > >
Re: Regarding Kafka connect task to partition relationship for both source and sink connectors
Hello Confirmed. Partition is the minimal granularity level, so having more consumers than the number of partitions of a topic for a same consumer group is useless, having P partitions means maximum parallelism is reached using P consumers. Regards, Sébastien. Le jeu. 30 mai 2024 à 14:43, Yeikel Santana a écrit : > Hi everyone, > > > From my understanding, if a topic has n partitions, we can create up to n > tasks for both the source and sink connectors to achieve the maximum > parallelism. Adding more tasks would not be beneficial, as they would > remain idle and be limited to the number of partitions of the topic > > > Could you please confirm if this understanding is correct? > > > If this understanding is incorrect could you please explain the > relationship if any? > > > Thank you! > > >
Re: [EXTERNAL] Regarding Kafka connect task to partition relationship for both source and sink connectors
The docs say: “Each task is assigned to a thread. Each task is capable of handling multiple Kafka partitions, but a single partition must be handled by only one task.”From what I understand additional tasks would sit idle. From: Yeikel Santana Date: Thursday, May 30, 2024 at 7:43 AM To: users@kafka.apache.org Subject: [EXTERNAL] Regarding Kafka connect task to partition relationship for both source and sink connectors Hi everyone, From my understanding, if a topic has n partitions, we can create up to n tasks for both the source and sink connectors to achieve the maximum parallelism. Adding more tasks would not be beneficial, as they would remain idle and be limited to the number of partitions of the topic Could you please confirm if this understanding is correct? If this understanding is incorrect could you please explain the relationship if any? Thank you! This e-mail and any files transmitted with it are confidential, may contain sensitive information, and are intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail in error, please notify the sender by reply e-mail immediately and destroy all copies of the e-mail and any attachments.
Regarding Kafka connect task to partition relationship for both source and sink connectors
Hi everyone, >From my understanding, if a topic has n partitions, we can create up to n >tasks for both the source and sink connectors to achieve the maximum >parallelism. Adding more tasks would not be beneficial, as they would remain >idle and be limited to the number of partitions of the topic Could you please confirm if this understanding is correct? If this understanding is incorrect could you please explain the relationship if any? Thank you!
kafka connect support time percision
Hi, I'm using kafka connect, passing data with avro schema. By default I get a schema of mili-seconds time precision for datetime2 columns. Do you support time precision of micro seconds as well? Thanks
Re: [Kafka Connect] Dead letter queues for source connectors?
Hey Greg, Thinking more, I do like the idea of a source-side equivalent of the ErrantRecordReporter interface! However, I also suspect we may have to reason more carefully about what users could do with this kind of information in a DLQ topic. Yes, it's an option to reset the connector (or a copy of it) to the earliest unprocessed partition/offset in the DLQ topic and start processing data anew from there, but this might lead to a flood of duplicates. IIRC it also wouldn't even be this simple if we were to tolerate duplicates--users would have to reset the connector to the offset just before the one present in the DLQ, since resetting to the actual offset would make it appear to the connector as if the record with that offset were produced and committed successfully. Maybe we could include both the offset for the failed record, and the offset for the last record before that one with the same source partition--basically saying to the user "This is the one that failed, and this is the one that needs to be reset to if you want to try again". I'm starting to wonder if, for now, we could try to design something that's useful for metadata and for manual intervention, but perhaps not for direct usage with a connector (e.g., we wouldn't expect people to take the contents of the DLQ topic and use it to start up a second connector for the sole purpose of slurping up previously-dropped records). Thoughts? Cheers, Chris On Tue, Mar 5, 2024 at 6:19 PM Greg Harris wrote: > Hey Chris, > > That's a cool idea! That can certainly be applied for failures other > than poll(), and could be useful when combined with the Offsets > modification API. > > Perhaps failures inside of poll() can be handled by an extra > mechanism, similar to the ErrantRecordReporter, which allows reporting > affected source partition/source offsets when a meaningful key or > value cannot be read. > > Thanks, > Greg > > On Tue, Mar 5, 2024 at 3:03 PM Chris Egerton > wrote: > > > > Hi Greg, > > > > This was my understanding as well--if we can't turn a record into a byte > > array on the source side, it's difficult to know exactly what to write > to a > > DLQ topic. > > > > One idea I've toyed with recently is that we could write the source > > partition and offset for the failed record (assuming, hopefully safely, > > that these can at least be serialized). This may not cover all bases, is > > highly dependent on how user-friendly the offsets published by the > > connector are, and does come with the risk of data loss (if the upstream > > system is wiped before skipped records can be recovered), but could be > > useful in some scenarios. > > > > Thoughts? > > > > Chris > > > > On Tue, Mar 5, 2024 at 5:49 PM Greg Harris > > > wrote: > > > > > Hi Yeikel, > > > > > > Thanks for your question. It certainly isn't clear from the original > > > KIP-298, the attached discussion, or the follow-up KIP-610 as to why > > > the situation is asymmetric. > > > > > > The reason as I understand it is: Source connectors are responsible > > > for importing data to Kafka. If an error occurs during this process, > > > then writing useful information to a dead letter queue about the > > > failure is at least as difficult as importing the record correctly. > > > > > > For some examples: > > > * If an error occurs during poll(), the external data has not yet been > > > transformed into a SourceRecord that the framework can transform or > > > serialize. > > > * If an error occurs during conversion/serialization, the external > > > data cannot be reasonably serialized to be forwarded to the DLQ. > > > * If a record cannot be written to Kafka, such as due to being too > > > large, the same failure is likely to happen with writing to the DLQ as > > > well. > > > > > > For the Sink side, we already know that the data was properly > > > serializable and appeared as a ConsumerRecord. That can > > > be forwarded to the DLQ as-is with a reasonable expectation for > > > success, with the same data formatting as the source topic. > > > > > > If you have a vision for how this can be improved and are interested, > > > please consider opening a KIP! The situation can certainly be made > > > better than it is today. > > > > > > Thanks! > > > Greg > > > > > > On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana > wrote: > > > > > > > > Hi all, > > > > > > > > Sink connectors support Dear Letter Queues[1], but Source connectors > > > don't seem to > > > > > > > > What is the reason that we decided to do that? > > > > > > > > In my data pipeline, I'd like to apply some transformations to the > > > messages before they are sink, but that leaves me vulnerable to > failures as > > > I need to either fail the connector or employ logging to track source > > > failures > > > > > > > > It seems that for now, I'll need to apply the transformations as a > sink > > > and possibly reinsert them back to Kafka for downstream consumption, > but > > > that sounds unnecessary > > > > > > > > > > > > [1] >
Re: [Kafka Connect] Dead letter queues for source connectors?
Hey Chris, That's a cool idea! That can certainly be applied for failures other than poll(), and could be useful when combined with the Offsets modification API. Perhaps failures inside of poll() can be handled by an extra mechanism, similar to the ErrantRecordReporter, which allows reporting affected source partition/source offsets when a meaningful key or value cannot be read. Thanks, Greg On Tue, Mar 5, 2024 at 3:03 PM Chris Egerton wrote: > > Hi Greg, > > This was my understanding as well--if we can't turn a record into a byte > array on the source side, it's difficult to know exactly what to write to a > DLQ topic. > > One idea I've toyed with recently is that we could write the source > partition and offset for the failed record (assuming, hopefully safely, > that these can at least be serialized). This may not cover all bases, is > highly dependent on how user-friendly the offsets published by the > connector are, and does come with the risk of data loss (if the upstream > system is wiped before skipped records can be recovered), but could be > useful in some scenarios. > > Thoughts? > > Chris > > On Tue, Mar 5, 2024 at 5:49 PM Greg Harris > wrote: > > > Hi Yeikel, > > > > Thanks for your question. It certainly isn't clear from the original > > KIP-298, the attached discussion, or the follow-up KIP-610 as to why > > the situation is asymmetric. > > > > The reason as I understand it is: Source connectors are responsible > > for importing data to Kafka. If an error occurs during this process, > > then writing useful information to a dead letter queue about the > > failure is at least as difficult as importing the record correctly. > > > > For some examples: > > * If an error occurs during poll(), the external data has not yet been > > transformed into a SourceRecord that the framework can transform or > > serialize. > > * If an error occurs during conversion/serialization, the external > > data cannot be reasonably serialized to be forwarded to the DLQ. > > * If a record cannot be written to Kafka, such as due to being too > > large, the same failure is likely to happen with writing to the DLQ as > > well. > > > > For the Sink side, we already know that the data was properly > > serializable and appeared as a ConsumerRecord. That can > > be forwarded to the DLQ as-is with a reasonable expectation for > > success, with the same data formatting as the source topic. > > > > If you have a vision for how this can be improved and are interested, > > please consider opening a KIP! The situation can certainly be made > > better than it is today. > > > > Thanks! > > Greg > > > > On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana wrote: > > > > > > Hi all, > > > > > > Sink connectors support Dear Letter Queues[1], but Source connectors > > don't seem to > > > > > > What is the reason that we decided to do that? > > > > > > In my data pipeline, I'd like to apply some transformations to the > > messages before they are sink, but that leaves me vulnerable to failures as > > I need to either fail the connector or employ logging to track source > > failures > > > > > > It seems that for now, I'll need to apply the transformations as a sink > > and possibly reinsert them back to Kafka for downstream consumption, but > > that sounds unnecessary > > > > > > > > > [1] > > https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065 > >
Re: [Kafka Connect] Dead letter queues for source connectors?
Hi Greg, This was my understanding as well--if we can't turn a record into a byte array on the source side, it's difficult to know exactly what to write to a DLQ topic. One idea I've toyed with recently is that we could write the source partition and offset for the failed record (assuming, hopefully safely, that these can at least be serialized). This may not cover all bases, is highly dependent on how user-friendly the offsets published by the connector are, and does come with the risk of data loss (if the upstream system is wiped before skipped records can be recovered), but could be useful in some scenarios. Thoughts? Chris On Tue, Mar 5, 2024 at 5:49 PM Greg Harris wrote: > Hi Yeikel, > > Thanks for your question. It certainly isn't clear from the original > KIP-298, the attached discussion, or the follow-up KIP-610 as to why > the situation is asymmetric. > > The reason as I understand it is: Source connectors are responsible > for importing data to Kafka. If an error occurs during this process, > then writing useful information to a dead letter queue about the > failure is at least as difficult as importing the record correctly. > > For some examples: > * If an error occurs during poll(), the external data has not yet been > transformed into a SourceRecord that the framework can transform or > serialize. > * If an error occurs during conversion/serialization, the external > data cannot be reasonably serialized to be forwarded to the DLQ. > * If a record cannot be written to Kafka, such as due to being too > large, the same failure is likely to happen with writing to the DLQ as > well. > > For the Sink side, we already know that the data was properly > serializable and appeared as a ConsumerRecord. That can > be forwarded to the DLQ as-is with a reasonable expectation for > success, with the same data formatting as the source topic. > > If you have a vision for how this can be improved and are interested, > please consider opening a KIP! The situation can certainly be made > better than it is today. > > Thanks! > Greg > > On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana wrote: > > > > Hi all, > > > > Sink connectors support Dear Letter Queues[1], but Source connectors > don't seem to > > > > What is the reason that we decided to do that? > > > > In my data pipeline, I'd like to apply some transformations to the > messages before they are sink, but that leaves me vulnerable to failures as > I need to either fail the connector or employ logging to track source > failures > > > > It seems that for now, I'll need to apply the transformations as a sink > and possibly reinsert them back to Kafka for downstream consumption, but > that sounds unnecessary > > > > > > [1] > https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065 >
Re: [Kafka Connect] Dead letter queues for source connectors?
Hi Yeikel, Thanks for your question. It certainly isn't clear from the original KIP-298, the attached discussion, or the follow-up KIP-610 as to why the situation is asymmetric. The reason as I understand it is: Source connectors are responsible for importing data to Kafka. If an error occurs during this process, then writing useful information to a dead letter queue about the failure is at least as difficult as importing the record correctly. For some examples: * If an error occurs during poll(), the external data has not yet been transformed into a SourceRecord that the framework can transform or serialize. * If an error occurs during conversion/serialization, the external data cannot be reasonably serialized to be forwarded to the DLQ. * If a record cannot be written to Kafka, such as due to being too large, the same failure is likely to happen with writing to the DLQ as well. For the Sink side, we already know that the data was properly serializable and appeared as a ConsumerRecord. That can be forwarded to the DLQ as-is with a reasonable expectation for success, with the same data formatting as the source topic. If you have a vision for how this can be improved and are interested, please consider opening a KIP! The situation can certainly be made better than it is today. Thanks! Greg On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana wrote: > > Hi all, > > Sink connectors support Dear Letter Queues[1], but Source connectors don't > seem to > > What is the reason that we decided to do that? > > In my data pipeline, I'd like to apply some transformations to the messages > before they are sink, but that leaves me vulnerable to failures as I need to > either fail the connector or employ logging to track source failures > > It seems that for now, I'll need to apply the transformations as a sink and > possibly reinsert them back to Kafka for downstream consumption, but that > sounds unnecessary > > > [1]https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065
[Kafka Connect] Dead letter queues for source connectors?
Hi all, Sink connectors support Dear Letter Queues[1], but Source connectors don't seem to What is the reason that we decided to do that? In my data pipeline, I'd like to apply some transformations to the messages before they are sink, but that leaves me vulnerable to failures as I need to either fail the connector or employ logging to track source failures It seems that for now, I'll need to apply the transformations as a sink and possibly reinsert them back to Kafka for downstream consumption, but that sounds unnecessary [1]https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065
[DISCUSS] Kafka Connect source task interruption semantics
Hi all, I'd like to solicit input from users and maintainers on a problem we've been dealing with for source task cleanup logic. If you'd like to pore over some Jira history, here's the primary link: https://issues.apache.org/jira/browse/KAFKA-15090 To summarize, we accidentally introduced a breaking change for Kafka Connect in https://github.com/apache/kafka/pull/9669. Before that change, the SourceTask::stop method [1] would be invoked on a separate thread from the one that did the actual data processing for the task (polling the task for records, transforming and converting those records, then sending them to Kafka). After that change, we began invoking SourceTask::stop on the same thread that handled data processing for the task. This had the effect that tasks which blocked indefinitely in the SourceTask::poll method [2] with the expectation that they could stop blocking when SourceTask::stop was invoked would no longer be capable of graceful shutdown, and may even hang forever. This breaking change was introduced in the 3.0.0 release, a little over two three ago. Since then, source connectors may have been modified to adapt to the change in behavior by the Connect framework. As a result, we are hesitant to go back to the prior logic of invoking SourceTask::stop on a separate thread (see the linked Jira ticket for more detail on this front). In https://github.com/apache/kafka/pull/14316, I proposed that we begin interrupting the data processing thread for the source task after it had exhausted its graceful shutdown timeout (i.e., when the Kafka Connect runtime decides to cancel [3], [4], [5] the task). I believe this change is fairly non-controversial--once a task has failed to shut down gracefully, the runtime can and should do whatever it wants to force a shutdown, graceful or otherwise. With all that context out of the way, the question I'd like to ask is: do we believe it's also appropriate to interrupt the data processing thread when the task is scheduled for shutdown [6], [7]? This interruption would ideally be followed up by a graceful shutdown of the task, which may require the Kafka Connect runtime to handle a potential InterruptedException from SourceTask::poll. Other exceptions (such as a wrapped InterruptedException) would be impossible to handle gracefully, and may lead to spurious error messages in the logs and failed final offset commits for connectors that do not work well with this new behavior. Finally, one important note: in the official documentation for SourceTask::poll, we do already state that this method should not block for too long: > If no data is currently available, this method should block but return control to the caller regularly (by returning null) in order for the task to transition to the PAUSED state if requested to do so. Looking forward to everyone's thoughts on this tricky issue! Cheers, Chris [1] - https://kafka.apache.org/36/javadoc/org/apache/kafka/connect/source/SourceTask.html#stop() [2] - https://kafka.apache.org/36/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll() [3] - https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1037 [4] - https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L129-L136 [5] - https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L284-L297 [6] - https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1014 [7] - https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L112-L127
Do we have any mechanism to control requests per second for a Kafka connect sink?
Hello everyone, Is there any mechanism to force Kafka Connect to ingest at a given rate per second as opposed to tasks? I am operating in a shared environment where the ingestion rate needs to be as low as possible (for example, 5 requests/second as an upper limit), and as far as I can tell, `tasks` are the main unit of work we can use. My current understanding is that a task will be blocked to process one batch, and it will continue to the next batch as soon as the previous request is completed. This should mean that if the target server can process the requests at a higher rate, then the sink will continue sending at that rate. However, in my scenario, what I need is to send n requests per second and then sit idle until that time passes to avoid overloading the target server. In this specific example, my best attempt to control the throughput was to configure it something like: ```json "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "max.retries": "10", "retry.backoff.ms": "1000", "max.buffered.records": "100", "batch.size": "100", "max.in.flight.requests": "1", "flush.synchronously": "true", ``` Unfortunately, while that helps, it does not solve the inherent problem. I also understand that this is very specific to the given Sink Connector, but my question is more about a global overwrite that could be applied if any. As an alternative, I also suppose that I could add a `Thread.sleep` call as an SMT, or to fork ElasticsearchSinkConnector to introduce something similar, but that does not sound like a good solution. Thank you!
RE: The Plan To Introduce Virtual Threads To Kafka Connect
Hi Team, Greetings, Apologies for the delay in reply as I was down with flu. We actually reached out to you for IT/ SAP/ Oracle/ Infor / Microsoft "VOTEC IT SERVICE PARTNERSHIP" "IT SERVICE OUTSOURCING" " "PARTNER SERVICE SUBCONTRACTING" We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White Label Model. Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee payroll, Help partner to get profit more than 50% on each project.. ..We really mean it. We are already working with platinum partner like NTT DATA, NEC Singapore, Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc. Are u keen to understand VOTEC IT SERVICE MODEL PARTNERSHIP offerings? Let us know your availability this week OR Next week?? We can arrange discussion with Partner Manager. -Original Message- From: Boyee [mailto:zhenchua...@163.com] Sent: 14 October 2023 12:38 To: users@kafka.apache.org Subject: The Plan To Introduce Virtual Threads To Kafka Connect Kafka Connect as a kind of thread-intense program, can benifit a lot from the usage of virtual threads. >From JDK 21, released in last month, virtual threads is a formal feature of JDK. I would like to ask if any plans exist to bring virtual threads into Kafka Connect. Thank you.
RE: The Plan To Introduce Virtual Threads To Kafka Connect
Hi Team, Greetings, Apologies for the delay in reply as I was down with flu. We actually reached out to you for IT/ SAP/ Oracle/ Infor / Microsoft “VOTEC IT SERVICE PARTNERSHIP” “IT SERVICE OUTSOURCING” “ “PARTNER SERVICE SUBCONTRACTING” We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White Label Model. Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee payroll, Help partner to get profit more than 50% on each project.. ..We really mean it. We are already working with platinum partner like NTT DATA, NEC Singapore, Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc. Are u keen to understand VOTEC IT SERVICE MODEL PARTNERSHIP offerings? Let us know your availability this week OR Next week?? We can arrange discussion with Partner Manager. -Original Message- From: Greg Harris [mailto:greg.har...@aiven.io.INVALID] Sent: 16 October 2023 20:38 To: users@kafka.apache.org Subject: Re: The Plan To Introduce Virtual Threads To Kafka Connect Hi Boyee, Thanks for the suggestion, Virtual threads look like they may be helpful for Connect, particularly in Connector plugins. There are currently no plans to use virtual threads in the Connect framework, maybe because we need to maintain compatibility with JDK 8 until 4.0. See https://kafka.apache.org/36/documentation.html#java for supported Java versions. I've opened a tracking ticket here: https://issues.apache.org/jira/browse/KAFKA-15611 but I'll leave it unassigned for anyone interested to spec out the work. I also see a similar ticket for the Producer: https://issues.apache.org/jira/browse/KAFKA-14606 . Thanks! Greg Harris On Mon, Oct 16, 2023 at 6:20 AM Boyee wrote: > > Kafka Connect as a kind of thread-intense program, can benifit a lot from the > usage of virtual threads. > From JDK 21, released in last month, virtual threads is a formal feature of > JDK. > I would like to ask if any plans exist to bring virtual threads into Kafka > Connect. > Thank you.
Re: The Plan To Introduce Virtual Threads To Kafka Connect
Hi Boyee, Thanks for the suggestion, Virtual threads look like they may be helpful for Connect, particularly in Connector plugins. There are currently no plans to use virtual threads in the Connect framework, maybe because we need to maintain compatibility with JDK 8 until 4.0. See https://kafka.apache.org/36/documentation.html#java for supported Java versions. I've opened a tracking ticket here: https://issues.apache.org/jira/browse/KAFKA-15611 but I'll leave it unassigned for anyone interested to spec out the work. I also see a similar ticket for the Producer: https://issues.apache.org/jira/browse/KAFKA-14606 . Thanks! Greg Harris On Mon, Oct 16, 2023 at 6:20 AM Boyee wrote: > > Kafka Connect as a kind of thread-intense program, can benifit a lot from the > usage of virtual threads. > From JDK 21, released in last month, virtual threads is a formal feature of > JDK. > I would like to ask if any plans exist to bring virtual threads into Kafka > Connect. > Thank you.
The Plan To Introduce Virtual Threads To Kafka Connect
Kafka Connect as a kind of thread-intense program, can benifit a lot from the usage of virtual threads. From JDK 21, released in last month, virtual threads is a formal feature of JDK. I would like to ask if any plans exist to bring virtual threads into Kafka Connect. Thank you.
Re: Kafka Connect - Customize REST request headers
Thank you for the explanation, Chris. In case it helps, what I'm looking for is similar to KIP 577[1]. My specific example involves a hard-coded key/value pair that needs to be used for pod-to-pod as I can connect to any worker without that specific header, but workers cannot communicate among themselves without it. To clarify, my environment is behind Istio[2], where Egress Traffic can be created using the following format: `..svc.cluster.local`. For example, a request among workers should be: curl -H "Host: ..svc.cluster.local" workerIP:PORT Regarding temporary solutions, I've explored options like utilizing a proxy but I am running within containers that can complicate it further, along with the possibilities of patching, recompiling, or replacing the connect-runtime jar temporarily. I think that something like this might work but I need to test it : private static void addHeadersToRequest(HttpHeaders headers, Request req) { req.header("Host","..svc.cluster.local"); if (headers != null) { String credentialAuthorization = headers.getHeaderString(HttpHeaders.AUTHORIZATION); if (credentialAuthorization != null) { req.header(HttpHeaders.AUTHORIZATION, credentialAuthorization); } } } This is of course risky and it would be significantly more convenient if this functionality is integrated into Kafka Connect itself [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP+577%3A+Allow+HTTP+Response+Headers+to+be+Configured+for+Kafka+Connect [2] https://istio.io/ On Sat, 07 Oct 2023 02:05:14 -0400 Chris Egerton wrote --- Hi Yeikel, Neat question! And thanks for the link to the RestClient code; very helpful. I don't believe there's a way to configure Kafka Connect to add these headers to forwarded requests right now. You may be able to do some kind of out-of-band proxy magic to intercept forwarded requests and insert the proper headers there. I don't see a reason for Kafka Connect to only forward authorization headers, even after examining the PR [1] and corresponding Jira ticket [2] that altered the RestClient class to begin including authorization headers in forwarded REST requests. We may be able to tweak the RestClient to include all headers instead of just the authorization header. I know that this doesn't help your immediate situation, but if other committers and contributors agree that the change would be beneficial, we may be able to include it in the next release (which may be 3.7.0, or a patch release for 3.4, 3.5, or 3.6). Alternatively, we may have to gate such a change behind a feature flag (either a coarse-grained boolean that enables/disables forwarding of all non-authorization headers, or more fine-grained logic such as include/exclude lists or even regexes), which would require a KIP and may take longer to release. I've CC'd the dev list to gather their perspective on this potential change, and to solicit their input on possible workarounds that may be useful to you sooner than the next release takes place. [1] - https://github.com/apache/kafka/pull/6791 [2] - https://issues.apache.org/jira/browse/KAFKA-8404 Cheers, Chris On Fri, Oct 6, 2023 at 10:14 PM Yeikel Santana <mailto:em...@yeikel.com> wrote: > Hello everyone, > > I'm currently running Kafka Connect behind a firewall that mandates the > inclusion of a specific header. This situation becomes particularly > challenging when forwarding requests among multiple workers, as it appears > that only the Authorization header is included in the request. > > I'm wondering if there's a way to customize the headers of Kafka Connect > before they are forwarded between workers. From my observations, it seems > that this capability may not be available[1], and only the response headers > can be customized. > > I'd appreciate any realistic alternatives or suggestions you may have in > mind. > > Thanks! > > > > > > > [1] > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L191-L198
Re: Kafka Connect - Customize REST request headers
Hi Yeikel, Neat question! And thanks for the link to the RestClient code; very helpful. I don't believe there's a way to configure Kafka Connect to add these headers to forwarded requests right now. You may be able to do some kind of out-of-band proxy magic to intercept forwarded requests and insert the proper headers there? I don't see a reason for Kafka Connect to only forward authorization headers, even after examining the PR [1] and corresponding Jira ticket [2] that altered the RestClient class to begin including authorization headers in forwarded REST requests. We may be able to tweak the RestClient to include all headers instead of just the authorization header. I know that this doesn't help your immediate situation, but if other committers and contributors agree that the change would be beneficial, we may be able to include it in the next release (which may be 3.7.0, or a patch release for 3.4, 3.5, or 3.6). Alternatively, we may have to gate such a change behind a feature flag (either a coarse-grained boolean that enables/disables forwarding of all non-authorization headers, or more fine-grained logic such as include/exclude lists or even regexes), which would require a KIP and may take longer to release. I've CC'd the dev list to gather their perspective on this potential change, and to solicit their input on possible workarounds that may be useful to you sooner than the next release takes place. [1] - https://github.com/apache/kafka/pull/6791 [2] - https://issues.apache.org/jira/browse/KAFKA-8404 Cheers, Chris On Fri, Oct 6, 2023 at 10:14 PM Yeikel Santana wrote: > Hello everyone, > > I'm currently running Kafka Connect behind a firewall that mandates the > inclusion of a specific header. This situation becomes particularly > challenging when forwarding requests among multiple workers, as it appears > that only the Authorization header is included in the request. > > I'm wondering if there's a way to customize the headers of Kafka Connect > before they are forwarded between workers. From my observations, it seems > that this capability may not be available[1], and only the response headers > can be customized. > > I'd appreciate any realistic alternatives or suggestions you may have in > mind. > > Thanks! > > > > > > > [1] > https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L191-L198
Kafka Connect - Customize REST request headers
Hello everyone, I'm currently running Kafka Connect behind a firewall that mandates the inclusion of a specific header. This situation becomes particularly challenging when forwarding requests among multiple workers, as it appears that only the Authorization header is included in the request. I'm wondering if there's a way to customize the headers of Kafka Connect before they are forwarded between workers. From my observations, it seems that this capability may not be available[1], and only the response headers can be customized. I'd appreciate any realistic alternatives or suggestions you may have in mind. Thanks! [1] https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L191-L198
Re: Regarding Distributed Kafka-connect cluster
Thank you, Yash. That additional documentation helps to further my understanding. In case it helps in any way, I am currently setting the rest.advertised.host.name, and listener properties to a private IP address that is resolvable within each data center. However, each data center can only communicate with each other using a load balancer. Is there any configuration I can set to help with this setup? For example, if the worker sends the request to the load balancer of the data center where the leader resides, I believe that it would work network-wise Thank you again for taking the time to help. On Tue, 26 Sep 2023 07:44:06 -0400 Yash Mayya wrote --- Hi Yeikel, > To clarify, who initiates the step that assigns a > connector to a specific worker? If this process > is controlled by the leader, wouldn't it result in a > failure to assign tasks to workers with whom it > cannot communicate? This happens via the group rebalance process where each Kafka Connect worker communicates with the Kafka broker that has been chosen as the group co-ordinator for the Kafka Connect cluster. The assignment is indeed computed by the leader Connect worker but it is disseminated to the other Connect workers via the group coordinator [1]. > I should not find myself in a situation where a > connector is assigned to a worker who cannot > communicate with the leader This can unfortunately happen, since the assignments aren't done directly through leader -> non-leader Connect worker communication but via the Kafka broker designated as the group co-ordinator for the Connect cluster. [1] - https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2 On Tue, Sep 26, 2023 at 8:25 AM Yeikel Santana <mailto:em...@yeikel.com> wrote: > Thank you, Yash. Your explanation makes sense > > To clarify, who initiates the step that assigns a connector to a specific > worker? If this process is controlled by the leader, wouldn't it result in > a failure to assign tasks to workers with whom it cannot communicate? > > Although it is not ideal, it is acceptable for now if some workers remain > inactive as long as the data center where the leader resides remains active > and continues to handle task assignments. I should not find myself in a > situation where a connector is assigned to a worker who cannot communicate > with the leader as that would render it useless as you mentioned > > Thank you for taking the time > > > > > > > > On Mon, 25 Sep 2023 11:41:18 -0400 Yash Mayya > <mailto:yash.ma...@gmail.com> > wrote --- > > > > Hi Yeikel, > > Heartbeats and group coordination in Kafka Connect do occur through Kafka, > but a Kafka Connect cluster where all workers cannot communicate with > each other won't work very well. You'll be able to create / update / > delete > connectors by making requests to any workers that can communicate with the > leader like you noted. However, certain internal operations require cross > Connect worker network access as well. For instance, after a connector is > started, it needs to spawn tasks that do the actual work. The tasks are > created via a POST request to the leader worker from the worker that is > running the connector. When you issue a create connector request to a > worker, a group rebalance ensues and the connector is assigned to a worker > in the cluster (it could be any worker, not necessarily the one to > which the request was issued). So if the connector that you created lands > on a Connect worker that cannot communicate with the leader worker, it > won't be able to create its tasks which will render the connector > essentially useless. > > Thanks, > Yash > > On Mon, Sep 25, 2023 at 7:51 PM Yeikel Santana > <mailto:mailto:em...@yeikel.com> > wrote: > > > Thank you, Nikhil. > > > > I did notice that challenge you're describing with the REST updates when > I > > had more than one worker within the same datacenter. > > > > Luckily, solving that was relatively simple as all my workers can > > communicate within the same data center, and all I need to do is to > ensure > > that the update is initiated from the same datacenter as the leader. > From > > what I tested so far, this seems to work fine. > > > > My biggest concern was regarding other operations such as heartbeats or > > general coordination. If that happens through Kafka, then I should be > > fine.Thank you for taking the time On Mon, 25 Sep 2023 09:45:43 > -0400 > > mailto:mailto:nikhilsrivastava4...@gmail.com wrote Hi Yeikel, > > > &
Re: Regarding Distributed Kafka-connect cluster
Hi Yeikel, > To clarify, who initiates the step that assigns a > connector to a specific worker? If this process > is controlled by the leader, wouldn't it result in a > failure to assign tasks to workers with whom it > cannot communicate? This happens via the group rebalance process where each Kafka Connect worker communicates with the Kafka broker that has been chosen as the group co-ordinator for the Kafka Connect cluster. The assignment is indeed computed by the leader Connect worker but it is disseminated to the other Connect workers via the group coordinator [1]. > I should not find myself in a situation where a > connector is assigned to a worker who cannot > communicate with the leader This can unfortunately happen, since the assignments aren't done directly through leader -> non-leader Connect worker communication but via the Kafka broker designated as the group co-ordinator for the Connect cluster. [1] - https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2 On Tue, Sep 26, 2023 at 8:25 AM Yeikel Santana wrote: > Thank you, Yash. Your explanation makes sense > > To clarify, who initiates the step that assigns a connector to a specific > worker? If this process is controlled by the leader, wouldn't it result in > a failure to assign tasks to workers with whom it cannot communicate? > > Although it is not ideal, it is acceptable for now if some workers remain > inactive as long as the data center where the leader resides remains active > and continues to handle task assignments. I should not find myself in a > situation where a connector is assigned to a worker who cannot communicate > with the leader as that would render it useless as you mentioned > > Thank you for taking the time > > > > > > > > On Mon, 25 Sep 2023 11:41:18 -0400 Yash Mayya > wrote --- > > > > Hi Yeikel, > > Heartbeats and group coordination in Kafka Connect do occur through Kafka, > but a Kafka Connect cluster where all workers cannot communicate with > each other won't work very well. You'll be able to create / update / > delete > connectors by making requests to any workers that can communicate with the > leader like you noted. However, certain internal operations require cross > Connect worker network access as well. For instance, after a connector is > started, it needs to spawn tasks that do the actual work. The tasks are > created via a POST request to the leader worker from the worker that is > running the connector. When you issue a create connector request to a > worker, a group rebalance ensues and the connector is assigned to a worker > in the cluster (it could be any worker, not necessarily the one to > which the request was issued). So if the connector that you created lands > on a Connect worker that cannot communicate with the leader worker, it > won't be able to create its tasks which will render the connector > essentially useless. > > Thanks, > Yash > > On Mon, Sep 25, 2023 at 7:51 PM Yeikel Santana <mailto:em...@yeikel.com> > wrote: > > > Thank you, Nikhil. > > > > I did notice that challenge you're describing with the REST updates when > I > > had more than one worker within the same datacenter. > > > > Luckily, solving that was relatively simple as all my workers can > > communicate within the same data center, and all I need to do is to > ensure > > that the update is initiated from the same datacenter as the leader. > From > > what I tested so far, this seems to work fine. > > > > My biggest concern was regarding other operations such as heartbeats or > > general coordination. If that happens through Kafka, then I should be > > fine.Thank you for taking the time On Mon, 25 Sep 2023 09:45:43 > -0400 > > mailto:nikhilsrivastava4...@gmail.com wrote Hi Yeikel, > > > > Sharing my two cents. Would let others chime in to add to this. > > > > Based on my understanding, if connect workers (which are all part of the > > same cluster) can communicate with the kafka brokers (which happens to > be > > the Group Coordinator and facilitates Connect Leader Election via Group > > Membership Protocol), then only 1 connect worker will be elected as > leader > > amongst all others in the cluster. Outside of that, I believe a bunch of > > REST calls to connect workers are forwarded to the connect leader (if > the > > REST request lands on a connect worker which isn't a leader). In case of > a > > non-retriable network partition between the non-leader worker and leader > > worker, those REST requests will fail. I'm referring to REST requests > like > > CREATE / UP
Re: Regarding Distributed Kafka-connect cluster
Thank you, Yash. Your explanation makes sense To clarify, who initiates the step that assigns a connector to a specific worker? If this process is controlled by the leader, wouldn't it result in a failure to assign tasks to workers with whom it cannot communicate? Although it is not ideal, it is acceptable for now if some workers remain inactive as long as the data center where the leader resides remains active and continues to handle task assignments. I should not find myself in a situation where a connector is assigned to a worker who cannot communicate with the leader as that would render it useless as you mentioned Thank you for taking the time On Mon, 25 Sep 2023 11:41:18 -0400 Yash Mayya wrote --- Hi Yeikel, Heartbeats and group coordination in Kafka Connect do occur through Kafka, but a Kafka Connect cluster where all workers cannot communicate with each other won't work very well. You'll be able to create / update / delete connectors by making requests to any workers that can communicate with the leader like you noted. However, certain internal operations require cross Connect worker network access as well. For instance, after a connector is started, it needs to spawn tasks that do the actual work. The tasks are created via a POST request to the leader worker from the worker that is running the connector. When you issue a create connector request to a worker, a group rebalance ensues and the connector is assigned to a worker in the cluster (it could be any worker, not necessarily the one to which the request was issued). So if the connector that you created lands on a Connect worker that cannot communicate with the leader worker, it won't be able to create its tasks which will render the connector essentially useless. Thanks, Yash On Mon, Sep 25, 2023 at 7:51 PM Yeikel Santana <mailto:em...@yeikel.com> wrote: > Thank you, Nikhil. > > I did notice that challenge you're describing with the REST updates when I > had more than one worker within the same datacenter. > > Luckily, solving that was relatively simple as all my workers can > communicate within the same data center, and all I need to do is to ensure > that the update is initiated from the same datacenter as the leader. From > what I tested so far, this seems to work fine. > > My biggest concern was regarding other operations such as heartbeats or > general coordination. If that happens through Kafka, then I should be > fine.Thank you for taking the time On Mon, 25 Sep 2023 09:45:43 -0400 > mailto:nikhilsrivastava4...@gmail.com wrote Hi Yeikel, > > Sharing my two cents. Would let others chime in to add to this. > > Based on my understanding, if connect workers (which are all part of the > same cluster) can communicate with the kafka brokers (which happens to be > the Group Coordinator and facilitates Connect Leader Election via Group > Membership Protocol), then only 1 connect worker will be elected as leader > amongst all others in the cluster. Outside of that, I believe a bunch of > REST calls to connect workers are forwarded to the connect leader (if the > REST request lands on a connect worker which isn't a leader). In case of a > non-retriable network partition between the non-leader worker and leader > worker, those REST requests will fail. I'm referring to REST requests like > CREATE / UPDATE / DELETE. > > Hope this helps a little. > > Thanks, > -Nikhil > > On Sun, 24 Sept 2023 at 06:36, Yeikel Santana <mailto:em...@yeikel.com> > wrote: > > > Hello everyone,I'm currently designing a new Kafka Connect cluster, and > > I'm trying to understand how connectivity functions among workers.In my > > setup, I have a single Kafka Connect cluster connected to the same Kafka > > topics and Kafka cluster. However, the workers are deployed in > > geographically separated data centers, each of which is fully isolated at > > the networkI suspect that this setup might not work with Kafka Connect > > because my current understanding is that ALL workers need to communicate > > with the leader for task coordination and heartbeats.In terms of leader > > election, can this result in multiple leaders and other potential > > issues?Any input and suggestions would be appreciated >
Re: Regarding Distributed Kafka-connect cluster
Hi Yeikel, Heartbeats and group coordination in Kafka Connect do occur through Kafka, but a Kafka Connect cluster where all workers cannot communicate with each other won't work very well. You'll be able to create / update / delete connectors by making requests to any workers that can communicate with the leader like you noted. However, certain internal operations require cross Connect worker network access as well. For instance, after a connector is started, it needs to spawn tasks that do the actual work. The tasks are created via a POST request to the leader worker from the worker that is running the connector. When you issue a create connector request to a worker, a group rebalance ensues and the connector is assigned to a worker in the cluster (it could be any worker, not necessarily the one to which the request was issued). So if the connector that you created lands on a Connect worker that cannot communicate with the leader worker, it won't be able to create its tasks which will render the connector essentially useless. Thanks, Yash On Mon, Sep 25, 2023 at 7:51 PM Yeikel Santana wrote: > Thank you, Nikhil. > > I did notice that challenge you're describing with the REST updates when I > had more than one worker within the same datacenter. > > Luckily, solving that was relatively simple as all my workers can > communicate within the same data center, and all I need to do is to ensure > that the update is initiated from the same datacenter as the leader. From > what I tested so far, this seems to work fine. > > My biggest concern was regarding other operations such as heartbeats or > general coordination. If that happens through Kafka, then I should be > fine.Thank you for taking the time On Mon, 25 Sep 2023 09:45:43 -0400 > nikhilsrivastava4...@gmail.com wrote Hi Yeikel, > > Sharing my two cents. Would let others chime in to add to this. > > Based on my understanding, if connect workers (which are all part of the > same cluster) can communicate with the kafka brokers (which happens to be > the Group Coordinator and facilitates Connect Leader Election via Group > Membership Protocol), then only 1 connect worker will be elected as leader > amongst all others in the cluster. Outside of that, I believe a bunch of > REST calls to connect workers are forwarded to the connect leader (if the > REST request lands on a connect worker which isn't a leader). In case of a > non-retriable network partition between the non-leader worker and leader > worker, those REST requests will fail. I'm referring to REST requests like > CREATE / UPDATE / DELETE. > > Hope this helps a little. > > Thanks, > -Nikhil > > On Sun, 24 Sept 2023 at 06:36, Yeikel Santana wrote: > > > Hello everyone,I'm currently designing a new Kafka Connect cluster, and > > I'm trying to understand how connectivity functions among workers.In my > > setup, I have a single Kafka Connect cluster connected to the same Kafka > > topics and Kafka cluster. However, the workers are deployed in > > geographically separated data centers, each of which is fully isolated at > > the networkI suspect that this setup might not work with Kafka Connect > > because my current understanding is that ALL workers need to communicate > > with the leader for task coordination and heartbeats.In terms of leader > > election, can this result in multiple leaders and other potential > > issues?Any input and suggestions would be appreciated >
Re: Regarding Distributed Kafka-connect cluster
Thank you, Nikhil. I did notice that challenge you're describing with the REST updates when I had more than one worker within the same datacenter. Luckily, solving that was relatively simple as all my workers can communicate within the same data center, and all I need to do is to ensure that the update is initiated from the same datacenter as the leader. From what I tested so far, this seems to work fine. My biggest concern was regarding other operations such as heartbeats or general coordination. If that happens through Kafka, then I should be fine.Thank you for taking the time On Mon, 25 Sep 2023 09:45:43 -0400 nikhilsrivastava4...@gmail.com wrote Hi Yeikel, Sharing my two cents. Would let others chime in to add to this. Based on my understanding, if connect workers (which are all part of the same cluster) can communicate with the kafka brokers (which happens to be the Group Coordinator and facilitates Connect Leader Election via Group Membership Protocol), then only 1 connect worker will be elected as leader amongst all others in the cluster. Outside of that, I believe a bunch of REST calls to connect workers are forwarded to the connect leader (if the REST request lands on a connect worker which isn't a leader). In case of a non-retriable network partition between the non-leader worker and leader worker, those REST requests will fail. I'm referring to REST requests like CREATE / UPDATE / DELETE. Hope this helps a little. Thanks, -Nikhil On Sun, 24 Sept 2023 at 06:36, Yeikel Santana wrote: > Hello everyone,I'm currently designing a new Kafka Connect cluster, and > I'm trying to understand how connectivity functions among workers.In my > setup, I have a single Kafka Connect cluster connected to the same Kafka > topics and Kafka cluster. However, the workers are deployed in > geographically separated data centers, each of which is fully isolated at > the networkI suspect that this setup might not work with Kafka Connect > because my current understanding is that ALL workers need to communicate > with the leader for task coordination and heartbeats.In terms of leader > election, can this result in multiple leaders and other potential > issues?Any input and suggestions would be appreciated
Re: Regarding Distributed Kafka-connect cluster
Kudos to Nikhil. your explanation adds to my knowledge. On Mon, 25 Sep 2023 at 7:16 PM, Nikhil Srivastava < nikhilsrivastava4...@gmail.com> wrote: > Hi Yeikel, > > Sharing my two cents. Would let others chime in to add to this. > > Based on my understanding, if connect workers (which are all part of the > same cluster) can communicate with the kafka brokers (which happens to be > the Group Coordinator and facilitates Connect Leader Election via Group > Membership Protocol), then only 1 connect worker will be elected as leader > amongst all others in the cluster. Outside of that, I believe a bunch of > REST calls to connect workers are forwarded to the connect leader (if the > REST request lands on a connect worker which isn't a leader). In case of a > non-retriable network partition between the non-leader worker and leader > worker, those REST requests will fail. I'm referring to REST requests like > CREATE / UPDATE / DELETE. > > Hope this helps a little. > > Thanks, > -Nikhil > > On Sun, 24 Sept 2023 at 06:36, Yeikel Santana wrote: > > > Hello everyone,I'm currently designing a new Kafka Connect cluster, and > > I'm trying to understand how connectivity functions among workers.In my > > setup, I have a single Kafka Connect cluster connected to the same Kafka > > topics and Kafka cluster. However, the workers are deployed in > > geographically separated data centers, each of which is fully isolated at > > the networkI suspect that this setup might not work with Kafka Connect > > because my current understanding is that ALL workers need to communicate > > with the leader for task coordination and heartbeats.In terms of leader > > election, can this result in multiple leaders and other potential > > issues?Any input and suggestions would be appreciated >
Re: Regarding Distributed Kafka-connect cluster
Hi Yeikel, Sharing my two cents. Would let others chime in to add to this. Based on my understanding, if connect workers (which are all part of the same cluster) can communicate with the kafka brokers (which happens to be the Group Coordinator and facilitates Connect Leader Election via Group Membership Protocol), then only 1 connect worker will be elected as leader amongst all others in the cluster. Outside of that, I believe a bunch of REST calls to connect workers are forwarded to the connect leader (if the REST request lands on a connect worker which isn't a leader). In case of a non-retriable network partition between the non-leader worker and leader worker, those REST requests will fail. I'm referring to REST requests like CREATE / UPDATE / DELETE. Hope this helps a little. Thanks, -Nikhil On Sun, 24 Sept 2023 at 06:36, Yeikel Santana wrote: > Hello everyone,I'm currently designing a new Kafka Connect cluster, and > I'm trying to understand how connectivity functions among workers.In my > setup, I have a single Kafka Connect cluster connected to the same Kafka > topics and Kafka cluster. However, the workers are deployed in > geographically separated data centers, each of which is fully isolated at > the networkI suspect that this setup might not work with Kafka Connect > because my current understanding is that ALL workers need to communicate > with the leader for task coordination and heartbeats.In terms of leader > election, can this result in multiple leaders and other potential > issues?Any input and suggestions would be appreciated
Regarding Distributed Kafka-connect cluster
Hello everyone,I'm currently designing a new Kafka Connect cluster, and I'm trying to understand how connectivity functions among workers.In my setup, I have a single Kafka Connect cluster connected to the same Kafka topics and Kafka cluster. However, the workers are deployed in geographically separated data centers, each of which is fully isolated at the networkI suspect that this setup might not work with Kafka Connect because my current understanding is that ALL workers need to communicate with the leader for task coordination and heartbeats.In terms of leader election, can this result in multiple leaders and other potential issues?Any input and suggestions would be appreciated
Re: Kafka connect Graceful stop of task failed
Hello Greg Thanks a *lot* for your help on this. Indeed the empty poll is not the issue for us. As mentioned, our setup is a poll every 24 hours. So the `stop()` being stuck due to the `poll()` is hitting us hard. I did a trace today on my dev environment, I can indeed see this waiting log entry every 100 ms <https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L427>. Then I call a DEL on this connector, and the stop is not processed until the next loop in the `poll()`. Your initial diagnosis is 100% correct. You mentioned other connectors already changed to support a stop signal from the same thread. Would u have any concrete connector impl to point me? Kind regards Robson On Mon, 21 Aug 2023 at 19:23, Greg Harris wrote: > Hey Robson, > > Thanks for opening an issue on the JDBC repo, I think this is > certainly relevant feedback for the connector developers. I commented > on the issue with a potential regression that I saw, you can try > downgrading your connector to see if the behavior improves. > I also know that kafka-connect-jdbc received a patch to improve this > behavior when no data is being emitted: > https://github.com/confluentinc/kafka-connect-jdbc/pull/947 but I'm > not sure if that is relevant to your situation. > > Thanks! > Greg > > On Mon, Aug 21, 2023 at 6:53 AM Robson Hermes > wrote: > > > > No, it stops them also. > > The problem is precisely what Greg described, now the stop signal comes > > from the same thread. So any source task which is running in a blocking > way > > will not process the stop signal until the current poll finishes. > > So would need to patch source jdbc connector. > > > > On Mon, 21 Aug 2023 at 15:48, sunil chaudhari < > sunilmchaudhar...@gmail.com> > > wrote: > > > > > I think when you delete connector it removes the task and workers > continues > > > to run. > > > When you stop it actually stops the worker. > > > Both different things. > > > Point to be noted is Worker has connector. > > > So connector should be removed before stopping the worker. > > > > > > Though I am not expert in this. > > > > > > On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes > > > wrote: > > > > > > > Hello Sunil > > > > > > > > I'm not calling a stop, I'm straight deleting the connectors with the > > > > DELETE. Stopping the connector is done internally during deletion. > > > > > > > > Regards > > > > Robson > > > > > > > > On Mon, 21 Aug 2023 at 15:36, sunil chaudhari < > > > sunilmchaudhar...@gmail.com > > > > > > > > > wrote: > > > > > > > > > You have to remove connectors first using delete api > > > > > and then stop the connector > > > > > > > > > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes < > robsonher...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hello > > > > > > > > > > > > I'm using kafka connect 7.4.0 to read data from Postgres views > and > > > > write > > > > > to > > > > > > another Postgres tables. So using JDBC source and sink > connectors. > > > > > > All works good, but whenever I stop the source connectors via the > > > rest > > > > > api: > > > > > > > > > > > > DEL http://kafka-connect:8083/connectors/connector_name_here > > > > > > > > > > > > The connector stops fine, but not the task: > > > > > > > > > > > > > > > > > > Graceful stop of connector (connector-name-here) succeeded. > > > > > > > > > > > > Graceful stop of task (task-name-here) failed. > > > > > > > > > > > > > > > > > > It only happens with the *source* connector tasks. The sink > connector > > > > > > and tasks shutdown gracefully and fine. > > > > > > > > > > > > The timeout for task shutdown has been increased, but didn't > help: > > > > > > > > > > > > task.shutdown.graceful.timeout.ms=6 > > > > > > > > > > > > > > > > > > > > > > > > The connectors are running once per day (during the night) to > load a > > > > > > lot of data, and the error happens when I try to delete the > > > connectors > > > > > > in the middle of the day. That is, they are not actually > > > > > > executing/loading any data, it has finished already. > > > > > > > > > > > > offset.flush.interval.ms=1 in development and integration > > > > > > environments. > > > > > > > > > > > > offset.flush.interval.ms=6 in production and uat. > > > > > > > > > > > > > > > > > > The rest of the config is pretty much the default. > > > > > > > > > > > > What could be the issue? > > > > > > > > > > > > The errors of the graceful stop of the tasks are triggering our > alert > > > > > > system, so trying to get rid of those. > > > > > > > > > > > > > > > > > > Thanks a lot > > > > > > > > > > > > Robson > > > > > > > > > > > > > > > > > > >
Re: Kafka connect Graceful stop of task failed
Hey Robson, Thanks for opening an issue on the JDBC repo, I think this is certainly relevant feedback for the connector developers. I commented on the issue with a potential regression that I saw, you can try downgrading your connector to see if the behavior improves. I also know that kafka-connect-jdbc received a patch to improve this behavior when no data is being emitted: https://github.com/confluentinc/kafka-connect-jdbc/pull/947 but I'm not sure if that is relevant to your situation. Thanks! Greg On Mon, Aug 21, 2023 at 6:53 AM Robson Hermes wrote: > > No, it stops them also. > The problem is precisely what Greg described, now the stop signal comes > from the same thread. So any source task which is running in a blocking way > will not process the stop signal until the current poll finishes. > So would need to patch source jdbc connector. > > On Mon, 21 Aug 2023 at 15:48, sunil chaudhari > wrote: > > > I think when you delete connector it removes the task and workers continues > > to run. > > When you stop it actually stops the worker. > > Both different things. > > Point to be noted is Worker has connector. > > So connector should be removed before stopping the worker. > > > > Though I am not expert in this. > > > > On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes > > wrote: > > > > > Hello Sunil > > > > > > I'm not calling a stop, I'm straight deleting the connectors with the > > > DELETE. Stopping the connector is done internally during deletion. > > > > > > Regards > > > Robson > > > > > > On Mon, 21 Aug 2023 at 15:36, sunil chaudhari < > > sunilmchaudhar...@gmail.com > > > > > > > wrote: > > > > > > > You have to remove connectors first using delete api > > > > and then stop the connector > > > > > > > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes > > > > wrote: > > > > > > > > > Hello > > > > > > > > > > I'm using kafka connect 7.4.0 to read data from Postgres views and > > > write > > > > to > > > > > another Postgres tables. So using JDBC source and sink connectors. > > > > > All works good, but whenever I stop the source connectors via the > > rest > > > > api: > > > > > > > > > > DEL http://kafka-connect:8083/connectors/connector_name_here > > > > > > > > > > The connector stops fine, but not the task: > > > > > > > > > > > > > > > Graceful stop of connector (connector-name-here) succeeded. > > > > > > > > > > Graceful stop of task (task-name-here) failed. > > > > > > > > > > > > > > > It only happens with the *source* connector tasks. The sink connector > > > > > and tasks shutdown gracefully and fine. > > > > > > > > > > The timeout for task shutdown has been increased, but didn't help: > > > > > > > > > > task.shutdown.graceful.timeout.ms=6 > > > > > > > > > > > > > > > > > > > > The connectors are running once per day (during the night) to load a > > > > > lot of data, and the error happens when I try to delete the > > connectors > > > > > in the middle of the day. That is, they are not actually > > > > > executing/loading any data, it has finished already. > > > > > > > > > > offset.flush.interval.ms=1 in development and integration > > > > > environments. > > > > > > > > > > offset.flush.interval.ms=6 in production and uat. > > > > > > > > > > > > > > > The rest of the config is pretty much the default. > > > > > > > > > > What could be the issue? > > > > > > > > > > The errors of the graceful stop of the tasks are triggering our alert > > > > > system, so trying to get rid of those. > > > > > > > > > > > > > > > Thanks a lot > > > > > > > > > > Robson > > > > > > > > > > > > > >
Re: Kafka connect Graceful stop of task failed
How can I stop getting these updates ? On Mon, Aug 21, 2023 at 9:01 AM Robson Hermes wrote: > This email was sent from an external source so please treat with caution. > > No, it stops them also. > The problem is precisely what Greg described, now the stop signal comes > from the same thread. So any source task which is running in a blocking way > will not process the stop signal until the current poll finishes. > So would need to patch source jdbc connector. > > On Mon, 21 Aug 2023 at 15:48, sunil chaudhari > > wrote: > > > I think when you delete connector it removes the task and workers > continues > > to run. > > When you stop it actually stops the worker. > > Both different things. > > Point to be noted is Worker has connector. > > So connector should be removed before stopping the worker. > > > > Though I am not expert in this. > > > > On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes > > wrote: > > > > > Hello Sunil > > > > > > I'm not calling a stop, I'm straight deleting the connectors with the > > > DELETE. Stopping the connector is done internally during deletion. > > > > > > Regards > > > Robson > > > > > > On Mon, 21 Aug 2023 at 15:36, sunil chaudhari < > > sunilmchaudhar...@gmail.com > > > > > > > wrote: > > > > > > > You have to remove connectors first using delete api > > > > and then stop the connector > > > > > > > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes < > robsonher...@gmail.com> > > > > wrote: > > > > > > > > > Hello > > > > > > > > > > I'm using kafka connect 7.4.0 to read data from Postgres views and > > > write > > > > to > > > > > another Postgres tables. So using JDBC source and sink connectors. > > > > > All works good, but whenever I stop the source connectors via the > > rest > > > > api: > > > > > > > > > > DEL http://kafka-connect:8083/connectors/connector_name_here > > > > > > > > > > The connector stops fine, but not the task: > > > > > > > > > > > > > > > Graceful stop of connector (connector-name-here) succeeded. > > > > > > > > > > Graceful stop of task (task-name-here) failed. > > > > > > > > > > > > > > > It only happens with the *source* connector tasks. The sink > connector > > > > > and tasks shutdown gracefully and fine. > > > > > > > > > > The timeout for task shutdown has been increased, but didn't help: > > > > > > > > > > task.shutdown.graceful.timeout.ms=6 > > > > > > > > > > > > > > > > > > > > The connectors are running once per day (during the night) to load > a > > > > > lot of data, and the error happens when I try to delete the > > connectors > > > > > in the middle of the day. That is, they are not actually > > > > > executing/loading any data, it has finished already. > > > > > > > > > > offset.flush.interval.ms=1 in development and integration > > > > > environments. > > > > > > > > > > offset.flush.interval.ms=6 in production and uat. > > > > > > > > > > > > > > > The rest of the config is pretty much the default. > > > > > > > > > > What could be the issue? > > > > > > > > > > The errors of the graceful stop of the tasks are triggering our > alert > > > > > system, so trying to get rid of those. > > > > > > > > > > > > > > > Thanks a lot > > > > > > > > > > Robson > > > > > > > > > > > > > > > About Ascential plc Ascential delivers specialist information, analytics and ecommerce optimisation platforms to the world's leading consumer brands and their ecosystems. Our world-class businesses improve performance and solve problems for our customers by delivering immediately actionable information combined with visionary longer-term thinking across Digital Commerce, Product Design and Marketing. We also serve customers across Retail & Financial Services. With over 3,000 employees across five continents, we combine local expertise with a global footprint for clients in over 120 countries. Ascential is listed on the London Stock Exchange. The information in or attached to this email is confidential and may be legally privileged. If you are not the intended recipient of this message, any use, disclosure, copying, distribution or any action taken in reliance on it is prohibited and may be unlawful. If you have received this message in error, please notify the sender immediately by return email and delete this message and any copies from your computer and network. Ascential does not warrant that this email and any attachments are free from viruses and accepts no liability for any loss resulting from infected email transmissions. Ascential reserves the right to monitor all email through its networks. Any view expressed may be those of the originator and not necessarily of Ascential plc. Please be advised all phone calls may be recorded for training and quality purposes and by accepting and/or making calls to us you acknowledge and agree to calls being recorded. Ascential plc, number 9934451 (England and Wales). Registered Office: 33 Kingsway, London, WC2B 6UF.
Re: Kafka connect Graceful stop of task failed
No, it stops them also. The problem is precisely what Greg described, now the stop signal comes from the same thread. So any source task which is running in a blocking way will not process the stop signal until the current poll finishes. So would need to patch source jdbc connector. On Mon, 21 Aug 2023 at 15:48, sunil chaudhari wrote: > I think when you delete connector it removes the task and workers continues > to run. > When you stop it actually stops the worker. > Both different things. > Point to be noted is Worker has connector. > So connector should be removed before stopping the worker. > > Though I am not expert in this. > > On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes > wrote: > > > Hello Sunil > > > > I'm not calling a stop, I'm straight deleting the connectors with the > > DELETE. Stopping the connector is done internally during deletion. > > > > Regards > > Robson > > > > On Mon, 21 Aug 2023 at 15:36, sunil chaudhari < > sunilmchaudhar...@gmail.com > > > > > wrote: > > > > > You have to remove connectors first using delete api > > > and then stop the connector > > > > > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes > > > wrote: > > > > > > > Hello > > > > > > > > I'm using kafka connect 7.4.0 to read data from Postgres views and > > write > > > to > > > > another Postgres tables. So using JDBC source and sink connectors. > > > > All works good, but whenever I stop the source connectors via the > rest > > > api: > > > > > > > > DEL http://kafka-connect:8083/connectors/connector_name_here > > > > > > > > The connector stops fine, but not the task: > > > > > > > > > > > > Graceful stop of connector (connector-name-here) succeeded. > > > > > > > > Graceful stop of task (task-name-here) failed. > > > > > > > > > > > > It only happens with the *source* connector tasks. The sink connector > > > > and tasks shutdown gracefully and fine. > > > > > > > > The timeout for task shutdown has been increased, but didn't help: > > > > > > > > task.shutdown.graceful.timeout.ms=6 > > > > > > > > > > > > > > > > The connectors are running once per day (during the night) to load a > > > > lot of data, and the error happens when I try to delete the > connectors > > > > in the middle of the day. That is, they are not actually > > > > executing/loading any data, it has finished already. > > > > > > > > offset.flush.interval.ms=1 in development and integration > > > > environments. > > > > > > > > offset.flush.interval.ms=6 in production and uat. > > > > > > > > > > > > The rest of the config is pretty much the default. > > > > > > > > What could be the issue? > > > > > > > > The errors of the graceful stop of the tasks are triggering our alert > > > > system, so trying to get rid of those. > > > > > > > > > > > > Thanks a lot > > > > > > > > Robson > > > > > > > > > >
Re: Kafka connect Graceful stop of task failed
I think when you delete connector it removes the task and workers continues to run. When you stop it actually stops the worker. Both different things. Point to be noted is Worker has connector. So connector should be removed before stopping the worker. Though I am not expert in this. On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes wrote: > Hello Sunil > > I'm not calling a stop, I'm straight deleting the connectors with the > DELETE. Stopping the connector is done internally during deletion. > > Regards > Robson > > On Mon, 21 Aug 2023 at 15:36, sunil chaudhari > > wrote: > > > You have to remove connectors first using delete api > > and then stop the connector > > > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes > > wrote: > > > > > Hello > > > > > > I'm using kafka connect 7.4.0 to read data from Postgres views and > write > > to > > > another Postgres tables. So using JDBC source and sink connectors. > > > All works good, but whenever I stop the source connectors via the rest > > api: > > > > > > DEL http://kafka-connect:8083/connectors/connector_name_here > > > > > > The connector stops fine, but not the task: > > > > > > > > > Graceful stop of connector (connector-name-here) succeeded. > > > > > > Graceful stop of task (task-name-here) failed. > > > > > > > > > It only happens with the *source* connector tasks. The sink connector > > > and tasks shutdown gracefully and fine. > > > > > > The timeout for task shutdown has been increased, but didn't help: > > > > > > task.shutdown.graceful.timeout.ms=6 > > > > > > > > > > > > The connectors are running once per day (during the night) to load a > > > lot of data, and the error happens when I try to delete the connectors > > > in the middle of the day. That is, they are not actually > > > executing/loading any data, it has finished already. > > > > > > offset.flush.interval.ms=1 in development and integration > > > environments. > > > > > > offset.flush.interval.ms=6 in production and uat. > > > > > > > > > The rest of the config is pretty much the default. > > > > > > What could be the issue? > > > > > > The errors of the graceful stop of the tasks are triggering our alert > > > system, so trying to get rid of those. > > > > > > > > > Thanks a lot > > > > > > Robson > > > > > >
Re: Kafka connect Graceful stop of task failed
Hello Sunil I'm not calling a stop, I'm straight deleting the connectors with the DELETE. Stopping the connector is done internally during deletion. Regards Robson On Mon, 21 Aug 2023 at 15:36, sunil chaudhari wrote: > You have to remove connectors first using delete api > and then stop the connector > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes > wrote: > > > Hello > > > > I'm using kafka connect 7.4.0 to read data from Postgres views and write > to > > another Postgres tables. So using JDBC source and sink connectors. > > All works good, but whenever I stop the source connectors via the rest > api: > > > > DEL http://kafka-connect:8083/connectors/connector_name_here > > > > The connector stops fine, but not the task: > > > > > > Graceful stop of connector (connector-name-here) succeeded. > > > > Graceful stop of task (task-name-here) failed. > > > > > > It only happens with the *source* connector tasks. The sink connector > > and tasks shutdown gracefully and fine. > > > > The timeout for task shutdown has been increased, but didn't help: > > > > task.shutdown.graceful.timeout.ms=6 > > > > > > > > The connectors are running once per day (during the night) to load a > > lot of data, and the error happens when I try to delete the connectors > > in the middle of the day. That is, they are not actually > > executing/loading any data, it has finished already. > > > > offset.flush.interval.ms=1 in development and integration > > environments. > > > > offset.flush.interval.ms=6 in production and uat. > > > > > > The rest of the config is pretty much the default. > > > > What could be the issue? > > > > The errors of the graceful stop of the tasks are triggering our alert > > system, so trying to get rid of those. > > > > > > Thanks a lot > > > > Robson > > >
Re: Kafka connect Graceful stop of task failed
You have to remove connectors first using delete api and then stop the connector On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes wrote: > Hello > > I'm using kafka connect 7.4.0 to read data from Postgres views and write to > another Postgres tables. So using JDBC source and sink connectors. > All works good, but whenever I stop the source connectors via the rest api: > > DEL http://kafka-connect:8083/connectors/connector_name_here > > The connector stops fine, but not the task: > > > Graceful stop of connector (connector-name-here) succeeded. > > Graceful stop of task (task-name-here) failed. > > > It only happens with the *source* connector tasks. The sink connector > and tasks shutdown gracefully and fine. > > The timeout for task shutdown has been increased, but didn't help: > > task.shutdown.graceful.timeout.ms=6 > > > > The connectors are running once per day (during the night) to load a > lot of data, and the error happens when I try to delete the connectors > in the middle of the day. That is, they are not actually > executing/loading any data, it has finished already. > > offset.flush.interval.ms=1 in development and integration > environments. > > offset.flush.interval.ms=6 in production and uat. > > > The rest of the config is pretty much the default. > > What could be the issue? > > The errors of the graceful stop of the tasks are triggering our alert > system, so trying to get rid of those. > > > Thanks a lot > > Robson >
Re: Kafka connect Graceful stop of task failed
Hello Greg (sorry about the duplicate e-mail, forgot to cc users mailing list) Thanks a lot for your detailed reply. I'm using JDBC Source connectors from kafka-connect-jdbc <https://github.com/confluentinc/kafka-connect-jdbc>. Indeed the `poll()` implementation is blocked, so it only processes a `stop()` when it returns from the current `poll()`execution. There was a change in the past to fix a similar problem <https://github.com/confluentinc/kafka-connect-jdbc/pull/677>, but not involving `stop()` from the same thread. I've just raised one <https://github.com/confluentinc/kafka-connect-jdbc/issues/1360>. Unfortunately setting a lower poll interval is not an option for me, as this is being used in a heavy data load operation, being executed only once per day. Will see if I'm able to come up with a change, although not sure yet. First time using kafka, kafka connect and kafka connect jdbc =D Kind regards Robson On Thu, 17 Aug 2023 at 00:37, Greg Harris wrote: > Hi Robson, > > Thank you for the detailed bug report. > > I believe the behavior that you're describing is caused by this flaw: > https://issues.apache.org/jira/browse/KAFKA-15090 which is still under > discussion. Since the above flaw was introduced in 3.0, source > connectors need to return from poll() before the graceful shutdown > timeout to avoid the error. > You may be able to mitigate the error if the connector allows you to > reduce its poll timeout/interval to something less than the graceful > timeout, but that will depend on the specific connector > implementation, so check the documentation for your connector. I know > some implementations have received patches to compensate for this > behavior in the framework, so also consider upgrading or checking > release notes for your connectors. > > As for the effects of this error: whenever a non-graceful stop occurs, > the runtime will immediately close the producer so that the task will > not be able to write any further records. However, it will still leave > resources for that task (threads, in-memory records, database > connections, etc) allocated, until the task does finally return from > poll(). While this is not desirable behavior, it seems to be treated > as just a nuisance error by most operators. > > I hope this gives some context for the error message you're seeing. > > Thanks, > Greg >
Re: Kafka connect Graceful stop of task failed
Hi Robson, Thank you for the detailed bug report. I believe the behavior that you're describing is caused by this flaw: https://issues.apache.org/jira/browse/KAFKA-15090 which is still under discussion. Since the above flaw was introduced in 3.0, source connectors need to return from poll() before the graceful shutdown timeout to avoid the error. You may be able to mitigate the error if the connector allows you to reduce its poll timeout/interval to something less than the graceful timeout, but that will depend on the specific connector implementation, so check the documentation for your connector. I know some implementations have received patches to compensate for this behavior in the framework, so also consider upgrading or checking release notes for your connectors. As for the effects of this error: whenever a non-graceful stop occurs, the runtime will immediately close the producer so that the task will not be able to write any further records. However, it will still leave resources for that task (threads, in-memory records, database connections, etc) allocated, until the task does finally return from poll(). While this is not desirable behavior, it seems to be treated as just a nuisance error by most operators. I hope this gives some context for the error message you're seeing. Thanks, Greg
Kafka connect Graceful stop of task failed
Hello I'm using kafka connect 7.4.0 to read data from Postgres views and write to another Postgres tables. So using JDBC source and sink connectors. All works good, but whenever I stop the source connectors via the rest api: DEL http://kafka-connect:8083/connectors/connector_name_here The connector stops fine, but not the task: Graceful stop of connector (connector-name-here) succeeded. Graceful stop of task (task-name-here) failed. It only happens with the *source* connector tasks. The sink connector and tasks shutdown gracefully and fine. The timeout for task shutdown has been increased, but didn't help: task.shutdown.graceful.timeout.ms=6 The connectors are running once per day (during the night) to load a lot of data, and the error happens when I try to delete the connectors in the middle of the day. That is, they are not actually executing/loading any data, it has finished already. offset.flush.interval.ms=1 in development and integration environments. offset.flush.interval.ms=6 in production and uat. The rest of the config is pretty much the default. What could be the issue? The errors of the graceful stop of the tasks are triggering our alert system, so trying to get rid of those. Thanks a lot Robson
RE: Kafka Connect Rest Extension Question
Hi Team, Greetings, We actually reached out to you for Oracle/ IT / SAP / Infor / Microsoft "VOTEC IT SERVICE PARTNERSHIP" "IT SERVICE OUTSOURCING" " "PARTNER SERVICE SUBCONTRACTING" We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White Label Model. Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee payroll, Help partner to get profit more than 50% on each project.. ..We really mean it. We are already working with platinum partner like NTT DATA, NEC Singapore, Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc. Are u keen to understand VOTEC IT PARTNERSHIP offerings? Looping KB kail...@votecgroup.com | Partnership In charge | Let us know your availability this week OR Next week?? -Original Message- From: mil...@votecgroup.com [mailto:mil...@votecgroup.com] Sent: 01 August 2023 11:56 To: users@kafka.apache.org Subject: RE: Kafka Connect Rest Extension Question Hi Team, Greetings, We actually reached out to you for Oracle/ IT / SAP / Infor / Microsoft "VOTEC IT SERVICE PARTNERSHIP" "IT SERVICE OUTSOURCING" " "PARTNER SERVICE SUBCONTRACTING" We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White Label Model. Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee payroll, Help partner to get profit more than 50% on each project.. ..We really mean it. We are already working with platinum partner like NTT DATA, NEC Singapore, Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc. Are u keen to understand VOTEC IT PARTNERSHIP offerings? Looping KB kail...@votecgroup.com | Partnership In charge | Let us know your availability this week OR Next week?? -Original Message- From: 양형욱 [mailto:hyungwooky...@navercorp.com] Sent: 31 July 2023 14:42 To: users@kafka.apache.org Subject: Kafka Connect Rest Extension Question https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error There is an issue with this link where the ConnectRestExtension implementation is not registered. I've done everything the kip official documentation says, but can you tell me why it doesn't work? 양형욱 Yang Hyung Wook Global Platform Dev 경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529 Tel Mobile 010-2815-2145 Email hyungwooky...@navercorp.com 위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 인하여 공개가 금지된 정보가 들어 있을 수 있습니다. 귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, 배포, 공개해서는 안됩니다. 그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다. This email and the information contained in this email are intended solely for the recipient(s) addressed above and may contain information that is confidential and/or privileged or whose disclosure is prohibited by law or other reasons. If you are not the intended recipient of this email, please be advised that any unauthorized storage, duplication, dissemination, distribution or disclosure of all or part of this email is strictly prohibited. If you received this email in error, please immediately contact NAVER Cloud Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies and attachments from your system. Thank you for your cooperation.
RE: Kafka Connect Rest Extension Question
Hi Team, Greetings, We actually reached out to you for Oracle/ IT / SAP / Infor / Microsoft "VOTEC IT SERVICE PARTNERSHIP" "IT SERVICE OUTSOURCING" " "PARTNER SERVICE SUBCONTRACTING" We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White Label Model. Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee payroll, Help partner to get profit more than 50% on each project.. ..We really mean it. We are already working with platinum partner like NTT DATA, NEC Singapore, Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc. Are u keen to understand VOTEC IT PARTNERSHIP offerings? Looping KB kail...@votecgroup.com | Partnership In charge | Let us know your availability this week OR Next week?? -Original Message- From: 양형욱 [mailto:hyungwooky...@navercorp.com] Sent: 31 July 2023 14:42 To: users@kafka.apache.org Subject: Kafka Connect Rest Extension Question https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error There is an issue with this link where the ConnectRestExtension implementation is not registered. I've done everything the kip official documentation says, but can you tell me why it doesn't work? 양형욱 Yang Hyung Wook Global Platform Dev 경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529 Tel Mobile 010-2815-2145 Email hyungwooky...@navercorp.com 위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 인하여 공개가 금지된 정보가 들어 있을 수 있습니다. 귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, 배포, 공개해서는 안됩니다. 그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다. This email and the information contained in this email are intended solely for the recipient(s) addressed above and may contain information that is confidential and/or privileged or whose disclosure is prohibited by law or other reasons. If you are not the intended recipient of this email, please be advised that any unauthorized storage, duplication, dissemination, distribution or disclosure of all or part of this email is strictly prohibited. If you received this email in error, please immediately contact NAVER Cloud Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies and attachments from your system. Thank you for your cooperation.
RE: Kafka Connect Rest Extension Question
Hi Team, Greetings, We actually reached out to you for Oracle/ IT / SAP / Infor / Microsoft "VOTEC IT SERVICE PARTNERSHIP" "IT SERVICE OUTSOURCING" " "PARTNER SERVICE SUBCONTRACTING" We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White Label Model. Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee payroll, Help partner to get profit more than 50% on each project.. ..We really mean it. We are already working with platinum partner like NTT DATA, NEC Singapore, Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc. Are u keen to understand VOTEC IT PARTNERSHIP offerings? Looping KB kail...@votecgroup.com | Partnership In charge | Let us know your availability this week OR Next week?? -Original Message- From: Greg Harris [mailto:greg.har...@aiven.io.INVALID] Sent: 31 July 2023 23:42 To: users@kafka.apache.org Subject: Re: Kafka Connect Rest Extension Question Hello Yang Hyung Wook, In your post I do not see anything obviously wrong, so you may need to do some more debugging. 1. Are you using the same jar for both the classpath and plugin.path tests? If not, do they both contain the service loader manifest file? You can test this with https://docs.oracle.com/javase/tutorial/deployment/jar/view.html 2. Do you see either of these errors https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L267-L269 for filesystem-specific problems? 3. This log line https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L275 should be printed multiple times when you're using plugin.path. Are you seeing a log including a directory containing your jar file, the jar file itself, or only other locations? 4. If there is a dependency-related error, it will appear with this error log: https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L431 If the above doesn't help, can you please provide your redacted worker startup logs after https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java#L120 and before the worker starts printing configurations? Thanks, Greg Harris On Mon, Jul 31, 2023 at 5:38 AM 양형욱 wrote: > > https://stackoverflow.com/questions/76797743/how-can-i-solve-connectre > stextension-error > > There is an issue with this link where the ConnectRestExtension > implementation is not registered. I've done everything the kip official > documentation says, but can you tell me why it doesn't work? > > 양형욱 Yang Hyung Wook > Global Platform Dev > > 경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529 Tel Mobile 010-2815-2145 > Email hyungwooky...@navercorp.com > > > > 위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 > 인하여 공개가 금지된 정보가 들어 있을 수 있습니다. > 귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, > 배포, 공개해서는 안됩니다. > 그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 > 주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다. > > This email and the information contained in this email are intended solely > for the recipient(s) addressed above and may contain information that is > confidential and/or privileged or whose disclosure is prohibited by law or > other reasons. > If you are not the intended recipient of this email, please be advised that > any unauthorized storage, duplication, dissemination, distribution or > disclosure of all or part of this email is strictly prohibited. > If you received this email in error, please immediately contact NAVER Cloud > Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies > and attachments from your system. Thank you for your cooperation.
Re: Kafka Connect Rest Extension Question
Hello Yang Hyung Wook, In your post I do not see anything obviously wrong, so you may need to do some more debugging. 1. Are you using the same jar for both the classpath and plugin.path tests? If not, do they both contain the service loader manifest file? You can test this with https://docs.oracle.com/javase/tutorial/deployment/jar/view.html 2. Do you see either of these errors https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L267-L269 for filesystem-specific problems? 3. This log line https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L275 should be printed multiple times when you're using plugin.path. Are you seeing a log including a directory containing your jar file, the jar file itself, or only other locations? 4. If there is a dependency-related error, it will appear with this error log: https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L431 If the above doesn't help, can you please provide your redacted worker startup logs after https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java#L120 and before the worker starts printing configurations? Thanks, Greg Harris On Mon, Jul 31, 2023 at 5:38 AM 양형욱 wrote: > > https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error > > There is an issue with this link where the ConnectRestExtension > implementation is not registered. I've done everything the kip official > documentation says, but can you tell me why it doesn't work? > > 양형욱 Yang Hyung Wook > Global Platform Dev > > 경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529 > Tel Mobile 010-2815-2145 > Email hyungwooky...@navercorp.com > > > > 위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 > 인하여 공개가 금지된 정보가 들어 있을 수 있습니다. > 귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, > 배포, 공개해서는 안됩니다. > 그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 > 주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다. > > This email and the information contained in this email are intended solely > for the recipient(s) addressed above and may contain information that is > confidential and/or privileged or whose disclosure is prohibited by law or > other reasons. > If you are not the intended recipient of this email, please be advised that > any unauthorized storage, duplication, dissemination, distribution or > disclosure of all or part of this email is strictly prohibited. > If you received this email in error, please immediately contact NAVER Cloud > Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies > and attachments from your system. Thank you for your cooperation.
Kafka Connect Rest Extension Question
https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error There is an issue with this link where the ConnectRestExtension implementation is not registered. I've done everything the kip official documentation says, but can you tell me why it doesn't work? 양형욱 Yang Hyung Wook Global Platform Dev 경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529 Tel Mobile 010-2815-2145 Email hyungwooky...@navercorp.com 위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 인하여 공개가 금지된 정보가 들어 있을 수 있습니다. 귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, 배포, 공개해서는 안됩니다. 그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다. This email and the information contained in this email are intended solely for the recipient(s) addressed above and may contain information that is confidential and/or privileged or whose disclosure is prohibited by law or other reasons. If you are not the intended recipient of this email, please be advised that any unauthorized storage, duplication, dissemination, distribution or disclosure of all or part of this email is strictly prohibited. If you received this email in error, please immediately contact NAVER Cloud Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies and attachments from your system. Thank you for your cooperation.
Re: Kafka Connect exactly-once semantic and very large transactions
Hi Chris, thanks for your response! Yes, we are looking also on other means how to enable exactly-once semantics for existing data (e.g. using incremental snapshot which snapshots the data incrementally and in smaller chunks), but first we would like to fully understand all the implications and consequences for Kafka, so if someone want to provide more insights or recommendations not mentioned in your response, it would be still very welcome. Thanks! Vojta On Thursday, 8 June 2023 15:58:37 CEST Chris Egerton wrote: > Hi Vojta, > > From my limited understanding of the Debezium snapshot process, I believe > that you're correct that producing the entire snapshot in a transaction is > the way to provide exactly-once semantics during that phase. If there's a > way to recover in-progress snapshots and skip over already-produced > records, then that could be a suitable alternative. > > You're correct that a large transaction timeout may be required to > accommodate this case (we even try to call this out in the error message > that users see on transaction timeouts [1]). I'm not very familiar with > broker logic but with my limited understanding, your assessment of the > impact of delayed log compaction also seems valid. > > The only other issue that comes to my mind is that latency will be higher > for downstream consumers since they won't be able to read any records until > the entire transaction is complete, assuming they're using the > read_committed isolation level. But given that this is the snapshotting > phase and you're presumably moving historical data instead of real-time > updates to your database, this should hopefully be acceptable for most > users. > > I'd be interested to hear what someone more familiar with client and broker > internals has to say! Going to be following this thread. > > [1] - > https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba0 > 9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnce > WorkerSourceTask.java#L357 > > Cheers, > > Chris > > On Thu, Jun 8, 2023 at 7:53 AM Vojtech Juranek wrote: > > Hi, > > I'm investigating possibilities of exactly-once semantic for Debezium [1] > > Kafka Connect source connectors, which implements change data capture for > > various databases. Debezium has two phases, initial snapshot phase and > > streaming phase. Initial snapshot phase loads existing data from the > > database > > and send it to the Kafka, subsequent streaming phase captures any changes > > to > > the data. > > > > Exactly-once delivery seems to work really well during the streaming > > phase. > > Now, I'm investigating how to ensure exactly-once delivery for initial > > snapshot phase. If the snapshot fails (e.g. due to DB connection drop or > > worker node crash), we force new snapshot after the restart as the data > > may > > change during the restart and the snapshot has to reflect the state of the > > data > > in time when it was executed. However, re-taking the snapshot produces > > duplicate records in the Kafka related topics. > > > > Probably the most easy solution to this issue is to run the whole snapshot > > in > > a single Kafka transaction. This may result into a huge transaction, > > containing millions of records, in some cases even billions of records. As > > these records cannot be consumed until transaction is committed and > > therefore > > logs cannot be compacted, this would potentially result in huge increase > > of > > Kafka logs. Also, as for the large DBs this is time consuming process, it > > would very likely result in transaction timeouts (unless the timeout is > > set to > > very large value). > > > > Is my understanding of the impact of very large transactions correct? Are > > there any other drawbacks I'm missing (e.g. can it also result in some > > memory > > issue or something similar)? > > > > Thanks in advanced! > > Vojta > > > > [1] https://debezium.io/ signature.asc Description: This is a digitally signed message part.
Re: Kafka Connect exactly-once semantic and very large transactions
Hi Vojta, >From my limited understanding of the Debezium snapshot process, I believe that you're correct that producing the entire snapshot in a transaction is the way to provide exactly-once semantics during that phase. If there's a way to recover in-progress snapshots and skip over already-produced records, then that could be a suitable alternative. You're correct that a large transaction timeout may be required to accommodate this case (we even try to call this out in the error message that users see on transaction timeouts [1]). I'm not very familiar with broker logic but with my limited understanding, your assessment of the impact of delayed log compaction also seems valid. The only other issue that comes to my mind is that latency will be higher for downstream consumers since they won't be able to read any records until the entire transaction is complete, assuming they're using the read_committed isolation level. But given that this is the snapshotting phase and you're presumably moving historical data instead of real-time updates to your database, this should hopefully be acceptable for most users. I'd be interested to hear what someone more familiar with client and broker internals has to say! Going to be following this thread. [1] - https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L357 Cheers, Chris On Thu, Jun 8, 2023 at 7:53 AM Vojtech Juranek wrote: > Hi, > I'm investigating possibilities of exactly-once semantic for Debezium [1] > Kafka Connect source connectors, which implements change data capture for > various databases. Debezium has two phases, initial snapshot phase and > streaming phase. Initial snapshot phase loads existing data from the > database > and send it to the Kafka, subsequent streaming phase captures any changes > to > the data. > > Exactly-once delivery seems to work really well during the streaming > phase. > Now, I'm investigating how to ensure exactly-once delivery for initial > snapshot phase. If the snapshot fails (e.g. due to DB connection drop or > worker node crash), we force new snapshot after the restart as the data > may > change during the restart and the snapshot has to reflect the state of the > data > in time when it was executed. However, re-taking the snapshot produces > duplicate records in the Kafka related topics. > > Probably the most easy solution to this issue is to run the whole snapshot > in > a single Kafka transaction. This may result into a huge transaction, > containing millions of records, in some cases even billions of records. As > these records cannot be consumed until transaction is committed and > therefore > logs cannot be compacted, this would potentially result in huge increase > of > Kafka logs. Also, as for the large DBs this is time consuming process, it > would very likely result in transaction timeouts (unless the timeout is > set to > very large value). > > Is my understanding of the impact of very large transactions correct? Are > there any other drawbacks I'm missing (e.g. can it also result in some > memory > issue or something similar)? > > Thanks in advanced! > Vojta > > [1] https://debezium.io/
Kafka Connect exactly-once semantic and very large transactions
Hi, I'm investigating possibilities of exactly-once semantic for Debezium [1] Kafka Connect source connectors, which implements change data capture for various databases. Debezium has two phases, initial snapshot phase and streaming phase. Initial snapshot phase loads existing data from the database and send it to the Kafka, subsequent streaming phase captures any changes to the data. Exactly-once delivery seems to work really well during the streaming phase. Now, I'm investigating how to ensure exactly-once delivery for initial snapshot phase. If the snapshot fails (e.g. due to DB connection drop or worker node crash), we force new snapshot after the restart as the data may change during the restart and the snapshot has to reflect the state of the data in time when it was executed. However, re-taking the snapshot produces duplicate records in the Kafka related topics. Probably the most easy solution to this issue is to run the whole snapshot in a single Kafka transaction. This may result into a huge transaction, containing millions of records, in some cases even billions of records. As these records cannot be consumed until transaction is committed and therefore logs cannot be compacted, this would potentially result in huge increase of Kafka logs. Also, as for the large DBs this is time consuming process, it would very likely result in transaction timeouts (unless the timeout is set to very large value). Is my understanding of the impact of very large transactions correct? Are there any other drawbacks I'm missing (e.g. can it also result in some memory issue or something similar)? Thanks in advanced! Vojta [1] https://debezium.io/ signature.asc Description: This is a digitally signed message part.
Kafka connect process listens to an unknown port
Hi all, I'm testing apache kafka connect for a project and I found that the main process listens to two different ports, the one to provide REST api, 8083 by default, and a different unprivileged port that changes its number each restart. For instance, this is fragment of the output from netstat command: tcp 0 0 0.0.0.0:8083 0.0.0.0:* LISTEN 28646/java tcp 0 0 0.0.0.0:42859 0.0.0.0:* LISTEN 28646/java <--THIS What's the purpose of that port?. Is there any public definition of that interface? I haven't found any documentation wrt that port number. Kindly regards, Jorge M.
Re: Kafka connect process listens to an unknown port
Hey Jorge, I looked into it, and can reproduce the second LISTEN port in a vanilla Kafka Connect cluster without any connectors running. Using jstack, I see that there are two threads that appear to be waiting in the corresponding accept methods: "RMI TCP Accept-0" #15 daemon prio=5 os_prio=31 cpu=0.37ms elapsed=790.61s tid=0x0001370f5a00 nid=0x6d03 runnable [0x00017255e000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.Net.accept(java.base@17.0.4.1/Native Method) "qtp1530870688-33-acceptor-0@45f75f75-http_8083@43826ec{HTTP/1.1, (http/1.1)}{0.0.0.0:8083}" #33 prio=3 os_prio=31 cpu=0.17ms elapsed=789.45s tid=0x00012712e400 nid=0x8707 runnable [0x0001737ca000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.Net.accept(java.base@17.0.4.1/Native Method) The latter appears to be the 8083 port which is serving the REST API, and is expected. The former appears to be a Java Remote Method Invocation socket on "port 0" which ends up selecting a random open port on each startup. I do see that RMI is often used to export JMX metrics, and it appears that kafka-run-class.sh (which is used when starting Kafka Connect) by default enables JMX metrics. I was able to disable the RMI port by setting the KAFKA_JMX_OPTS="-Dkey=value" environment variable before running bin/connect-distributed.sh. This isn't a recommendation for you, but proves that the default KAFKA_JMX_OPTS settings from the kafka-run-class.sh are relevant. Thanks for the question, I learned something new! Greg Harris On Fri, May 19, 2023 at 4:45 AM Jorge Martin Cristobal wrote: > > Hi all, > > I'm testing apache kafka connect for a project and I found that the main > process listens to two different ports, the one to provide REST api, 8083 > by default, and a different unprivileged port that changes its number each > restart. For instance, this is fragment of the output from netstat command: > tcp 0 0 0.0.0.0:8083 0.0.0.0:* LISTEN 28646/java > tcp 0 0 0.0.0.0:42859 0.0.0.0:* LISTEN 28646/java <--THIS > > What's the purpose of that port?. Is there any public definition of that > interface? I haven't found any documentation wrt that port number. > Kindly regards, > > Jorge M.
Kafka connect process listens to an unknown port
Hi all, I'm testing apache kafka connect for a project and I found that the main process listens to two different ports, the one to provide REST api, 8083 by default, and a different unprivileged port that changes its number each restart. For instance, this is fragment of the output from netstat command: tcp 0 0 0.0.0.0:8083 0.0.0.0:* LISTEN 28646/java tcp 0 0 0.0.0.0:42859 0.0.0.0:* LISTEN 28646/java <--THIS What's the purpose of that port?. Is there any public definition of that interface? I haven't found any documentation wrt that port number. Kindly regards, Jorge M.
Kafka Connect Startup Hook
Hello, can someone please give me a hint how to execute two lines of code upon Kafka Connect Startup, like: final JaegerTracer tracer = Configuration.fromEnv().getTracer(); GlobalTracer.register(tracer); I implemented using a custom (Fake-)Connector, but there is much overhead, because you also need a Task, Config etc. Is there some simpler way, some kind of hook? Thanks in advance, Jan
Re: AW: Kafka Connect Startup Hook
unsubscribe On 20/03/2023, 10:30, "Jan Baudisch (extern)" mailto:jan.baudisch.ext...@bdess.com>> wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hello Jakub, thank you for you quick answer. We solved it by implementing a ConfigProvider, like described here: https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configprovider-interface <https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configprovider-interface> public class TracingConfigProvider implements ConfigProvider { @Override public void configure(Map map) { final JaegerTracer tracer = Configuration.fromEnv().getTracer(); GlobalTracer.registerIfAbsent(tracer); } @Override public ConfigData get(String s) { return null; } @Override public ConfigData get(String s, Set set) { return null; } @Override public void close(){} } And setting these Environment Variables in Kafka Connect - CONNECT_CONFIG_PROVIDERS=tracing - CONNECT_CONFIG_PROVIDERS_TRACING_CLASS=org.example.TracingConfigProvider Best regards, Jan Von: Jakub Scholz mailto:ja...@scholz.cz>> Datum: Montag, 20. März 2023 um 10:23 An: users@kafka.apache.org <mailto:users@kafka.apache.org> mailto:users@kafka.apache.org>> Betreff: Re: Kafka Connect Startup Hook In Strimzi, we use a Java agent to register the tracer ( https://github.com/strimzi/strimzi-kafka-operator/tree/main/tracing-agent/ <https://github.com/strimzi/strimzi-kafka-operator/tree/main/tracing-agent/> if you wanna check the source code). Jakub On Mon, Mar 20, 2023 at 9:18 AM Jan Baudisch (extern) < jan.baudisch.ext...@bdess.com <mailto:jan.baudisch.ext...@bdess.com>> wrote: > Hello, > > can someone please give me a hint how to execute two lines of code upon > Kafka Connect Startup, like: > > final JaegerTracer tracer = Configuration.fromEnv().getTracer(); > GlobalTracer.register(tracer); > > I implemented using a custom (Fake-)Connector, but there is much overhead, > because you also need a Task, Config etc. > > Is there some simpler way, some kind of hook? > > Thanks in advance, > Jan > > >
AW: Kafka Connect Startup Hook
Hello Jakub, thank you for you quick answer. We solved it by implementing a ConfigProvider, like described here: https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configprovider-interface public class TracingConfigProvider implements ConfigProvider { @Override public void configure(Map map) { final JaegerTracer tracer = Configuration.fromEnv().getTracer(); GlobalTracer.registerIfAbsent(tracer); } @Override public ConfigData get(String s) { return null; } @Override public ConfigData get(String s, Set set) { return null; } @Override public void close(){} } And setting these Environment Variables in Kafka Connect - CONNECT_CONFIG_PROVIDERS=tracing - CONNECT_CONFIG_PROVIDERS_TRACING_CLASS=org.example.TracingConfigProvider Best regards, Jan Von: Jakub Scholz Datum: Montag, 20. März 2023 um 10:23 An: users@kafka.apache.org Betreff: Re: Kafka Connect Startup Hook In Strimzi, we use a Java agent to register the tracer ( https://github.com/strimzi/strimzi-kafka-operator/tree/main/tracing-agent/ if you wanna check the source code). Jakub On Mon, Mar 20, 2023 at 9:18 AM Jan Baudisch (extern) < jan.baudisch.ext...@bdess.com> wrote: > Hello, > > can someone please give me a hint how to execute two lines of code upon > Kafka Connect Startup, like: > > final JaegerTracer tracer = Configuration.fromEnv().getTracer(); > GlobalTracer.register(tracer); > > I implemented using a custom (Fake-)Connector, but there is much overhead, > because you also need a Task, Config etc. > > Is there some simpler way, some kind of hook? > > Thanks in advance, > Jan > > >
Re: Kafka Connect Startup Hook
In Strimzi, we use a Java agent to register the tracer ( https://github.com/strimzi/strimzi-kafka-operator/tree/main/tracing-agent/ if you wanna check the source code). Jakub On Mon, Mar 20, 2023 at 9:18 AM Jan Baudisch (extern) < jan.baudisch.ext...@bdess.com> wrote: > Hello, > > can someone please give me a hint how to execute two lines of code upon > Kafka Connect Startup, like: > > final JaegerTracer tracer = Configuration.fromEnv().getTracer(); > GlobalTracer.register(tracer); > > I implemented using a custom (Fake-)Connector, but there is much overhead, > because you also need a Task, Config etc. > > Is there some simpler way, some kind of hook? > > Thanks in advance, > Jan > > >
Kafka Connect Startup Hook
Hello, can someone please give me a hint how to execute two lines of code upon Kafka Connect Startup, like: final JaegerTracer tracer = Configuration.fromEnv().getTracer(); GlobalTracer.register(tracer); I implemented using a custom (Fake-)Connector, but there is much overhead, because you also need a Task, Config etc. Is there some simpler way, some kind of hook? Thanks in advance, Jan
Re: Exactly once kafka connect query
Hi Nitty, I understand your concerns about preserving intellectual property. Perhaps to avoid these altogether, instead of a call, you can provide a reproduction of your issues that is acceptable to share with the public? If I'm able to successfully diagnose the problem, I can share a summary on the mailing list, and if things are still unclear, I can join a call to discuss the publicly-visible example and what seems to be the issue. Cheers, Chris On Thu, Mar 16, 2023 at 5:54 AM NITTY BENNY wrote: > Hi Xiaoxia, > > I am not able to see the attachments you shared with me. I don't understand > the problem you are talking about. > What do you want me to look? > > Thanks, > Nitty > > On Thu, Mar 16, 2023 at 1:54 AM 小侠 <747359...@qq.com.invalid> wrote: > > > Hi Nitty, > > I'm so sorry to forget the signature. > > Looking forward to your reply. > > > > > > Thank you, > > Xiaoxia > > > > > > > > > > -- 原始邮件 ------ > > *发件人:* "users" ; > > *发送时间:* 2023年3月15日(星期三) 晚上6:38 > > *收件人:* "users"; > > *主题:* Re: Exactly once kafka connect query > > > > Hi Chris, > > > > We won't be abe to share the source code since it is the properetry > Amdocs > > code. > > > > If you have time for a call, I can show you the code and > > reproduction scenario over the call. I strongly believe you can find the > > issue with that. > > > > Thanks, > > Nitty > > > > Thanks, > > Nitty > > > > On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton > > wrote: > > > > > Hi Nitty, > > > > > > Sorry, I should have clarified. The reason I'm thinking about shutdown > > here > > > is that, when exactly-once support is enabled on a Kafka Connect > cluster > > > and a new set of task configurations is generated for a connector, the > > > Connect framework makes an effort to shut down all the old task > instances > > > for that connector, and then fences out the transactional producers for > > all > > > of those instances. I was thinking that this may lead to the producer > > > exceptions you are seeing but, after double-checking this assumption, > > that > > > does not appear to be the case. > > > > > > Would it be possible to share the source code for your connector and a > > > reproduction scenario for what you're seeing? That may be easier than > > > coordinating a call. > > > > > > Cheers, > > > > > > Chris > > > > > > On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY > > wrote: > > > > > > > Hi Chris, > > > > > > > > Is there any possibility to have a call with you? This is actually > > > blocking > > > > our delivery, I actually want to sort with this. > > > > > > > > Thanks, > > > > Nitty > > > > > > > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY > > > wrote: > > > > > > > > > Hi Chris, > > > > > > > > > > I really don't understand why a graceful shutdown will happen > during > > a > > > > > commit operation? Am I understanding something wrong here?. I see > > > > > this happens when I have a batch of 2 valid records and in the > second > > > > > batch the record is invalid. In that case I want to commit the > valid > > > > > records. So I called commit and sent an empty list for the current > > > batch > > > > to > > > > > poll() and then when the next file comes in and poll sees new > > records, > > > I > > > > > see InvalidProducerEpochException. > > > > > Please advise me. > > > > > > > > > > Thanks, > > > > > Nitty > > > > > > > > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY > > > > wrote: > > > > > > > > > >> Hi Chris, > > > > >> > > > > >> The difference is in the Task Classes, no difference for value/key > > > > >> convertors. > > > > >> > > > > >> I don’t see log messages for graceful shutdown. I am not clear on > > what > > > > >> you mean by shutting down the task. > > > > >> > > > > >> I called the commit operation for the successful records. Should I > > > > >> perf
Re: Exactly once kafka connect query
Hi Xiaoxia, I am not able to see the attachments you shared with me. I don't understand the problem you are talking about. What do you want me to look? Thanks, Nitty On Thu, Mar 16, 2023 at 1:54 AM 小侠 <747359...@qq.com.invalid> wrote: > Hi Nitty, > I'm so sorry to forget the signature. > Looking forward to your reply. > > > Thank you, > Xiaoxia > > > > > -- 原始邮件 -- > *发件人:* "users" ; > *发送时间:* 2023年3月15日(星期三) 晚上6:38 > *收件人:* "users"; > *主题:* Re: Exactly once kafka connect query > > Hi Chris, > > We won't be abe to share the source code since it is the properetry Amdocs > code. > > If you have time for a call, I can show you the code and > reproduction scenario over the call. I strongly believe you can find the > issue with that. > > Thanks, > Nitty > > Thanks, > Nitty > > On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton > wrote: > > > Hi Nitty, > > > > Sorry, I should have clarified. The reason I'm thinking about shutdown > here > > is that, when exactly-once support is enabled on a Kafka Connect cluster > > and a new set of task configurations is generated for a connector, the > > Connect framework makes an effort to shut down all the old task instances > > for that connector, and then fences out the transactional producers for > all > > of those instances. I was thinking that this may lead to the producer > > exceptions you are seeing but, after double-checking this assumption, > that > > does not appear to be the case. > > > > Would it be possible to share the source code for your connector and a > > reproduction scenario for what you're seeing? That may be easier than > > coordinating a call. > > > > Cheers, > > > > Chris > > > > On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY > wrote: > > > > > Hi Chris, > > > > > > Is there any possibility to have a call with you? This is actually > > blocking > > > our delivery, I actually want to sort with this. > > > > > > Thanks, > > > Nitty > > > > > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY > > wrote: > > > > > > > Hi Chris, > > > > > > > > I really don't understand why a graceful shutdown will happen during > a > > > > commit operation? Am I understanding something wrong here?. I see > > > > this happens when I have a batch of 2 valid records and in the second > > > > batch the record is invalid. In that case I want to commit the valid > > > > records. So I called commit and sent an empty list for the current > > batch > > > to > > > > poll() and then when the next file comes in and poll sees new > records, > > I > > > > see InvalidProducerEpochException. > > > > Please advise me. > > > > > > > > Thanks, > > > > Nitty > > > > > > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY > > > wrote: > > > > > > > >> Hi Chris, > > > >> > > > >> The difference is in the Task Classes, no difference for value/key > > > >> convertors. > > > >> > > > >> I don’t see log messages for graceful shutdown. I am not clear on > what > > > >> you mean by shutting down the task. > > > >> > > > >> I called the commit operation for the successful records. Should I > > > >> perform any other steps if I have an invalid record? > > > >> Please advise. > > > >> > > > >> Thanks, > > > >> Nitty > > > >> > > > >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton > > > > > > >> wrote: > > > >> > > > >>> Hi Nitty, > > > >>> > > > >>> Thanks again for all the details here, especially the log messages. > > > >>> > > > >>> > The below mentioned issue is happening for Json connector only. > Is > > > >>> there > > > >>> any difference with asn1,binary,csv and json connector? > > > >>> > > > >>> Can you clarify if the difference here is in the Connector/Task > > > classens, > > > >>> or if it's in the key/value converters that are configured for the > > > >>> connector? The key/value converters are configured using the > > > >>> &
Re: Exactly once kafka connect query
Hi Chris, We won't be abe to share the source code since it is the properetry Amdocs code. If you have time for a call, I can show you the code and reproduction scenario over the call. I strongly believe you can find the issue with that. Thanks, Nitty Thanks, Nitty On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton wrote: > Hi Nitty, > > Sorry, I should have clarified. The reason I'm thinking about shutdown here > is that, when exactly-once support is enabled on a Kafka Connect cluster > and a new set of task configurations is generated for a connector, the > Connect framework makes an effort to shut down all the old task instances > for that connector, and then fences out the transactional producers for all > of those instances. I was thinking that this may lead to the producer > exceptions you are seeing but, after double-checking this assumption, that > does not appear to be the case. > > Would it be possible to share the source code for your connector and a > reproduction scenario for what you're seeing? That may be easier than > coordinating a call. > > Cheers, > > Chris > > On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY wrote: > > > Hi Chris, > > > > Is there any possibility to have a call with you? This is actually > blocking > > our delivery, I actually want to sort with this. > > > > Thanks, > > Nitty > > > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY > wrote: > > > > > Hi Chris, > > > > > > I really don't understand why a graceful shutdown will happen during a > > > commit operation? Am I understanding something wrong here?. I see > > > this happens when I have a batch of 2 valid records and in the second > > > batch the record is invalid. In that case I want to commit the valid > > > records. So I called commit and sent an empty list for the current > batch > > to > > > poll() and then when the next file comes in and poll sees new records, > I > > > see InvalidProducerEpochException. > > > Please advise me. > > > > > > Thanks, > > > Nitty > > > > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY > > wrote: > > > > > >> Hi Chris, > > >> > > >> The difference is in the Task Classes, no difference for value/key > > >> convertors. > > >> > > >> I don’t see log messages for graceful shutdown. I am not clear on what > > >> you mean by shutting down the task. > > >> > > >> I called the commit operation for the successful records. Should I > > >> perform any other steps if I have an invalid record? > > >> Please advise. > > >> > > >> Thanks, > > >> Nitty > > >> > > >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton > > > >> wrote: > > >> > > >>> Hi Nitty, > > >>> > > >>> Thanks again for all the details here, especially the log messages. > > >>> > > >>> > The below mentioned issue is happening for Json connector only. Is > > >>> there > > >>> any difference with asn1,binary,csv and json connector? > > >>> > > >>> Can you clarify if the difference here is in the Connector/Task > > classens, > > >>> or if it's in the key/value converters that are configured for the > > >>> connector? The key/value converters are configured using the > > >>> "key.converter" and "value.converter" property and, if problems arise > > >>> with > > >>> them, the task will fail and, if it has a non-empty ongoing > > transaction, > > >>> that transaction will be automatically aborted since we close the > > task's > > >>> Kafka producer when it fails (or shuts down gracefully). > > >>> > > >>> With regards to these log messages: > > >>> > > >>> > org.apache.kafka.common.errors.ProducerFencedException: There is a > > >>> newer > > >>> producer with the same transactionalId which fences the current one. > > >>> > > >>> It looks like your tasks aren't shutting down gracefully in time, > which > > >>> causes them to be fenced out by the Connect framework later on. Do > you > > >>> see > > >>> messages like "Graceful stop of task failed" in the > logs > > >>> for > > >>> your Connect worker? > > >>> > > >>> Cheers
Re: Exactly once kafka connect query
Hi Nitty, Sorry, I should have clarified. The reason I'm thinking about shutdown here is that, when exactly-once support is enabled on a Kafka Connect cluster and a new set of task configurations is generated for a connector, the Connect framework makes an effort to shut down all the old task instances for that connector, and then fences out the transactional producers for all of those instances. I was thinking that this may lead to the producer exceptions you are seeing but, after double-checking this assumption, that does not appear to be the case. Would it be possible to share the source code for your connector and a reproduction scenario for what you're seeing? That may be easier than coordinating a call. Cheers, Chris On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY wrote: > Hi Chris, > > Is there any possibility to have a call with you? This is actually blocking > our delivery, I actually want to sort with this. > > Thanks, > Nitty > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY wrote: > > > Hi Chris, > > > > I really don't understand why a graceful shutdown will happen during a > > commit operation? Am I understanding something wrong here?. I see > > this happens when I have a batch of 2 valid records and in the second > > batch the record is invalid. In that case I want to commit the valid > > records. So I called commit and sent an empty list for the current batch > to > > poll() and then when the next file comes in and poll sees new records, I > > see InvalidProducerEpochException. > > Please advise me. > > > > Thanks, > > Nitty > > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY > wrote: > > > >> Hi Chris, > >> > >> The difference is in the Task Classes, no difference for value/key > >> convertors. > >> > >> I don’t see log messages for graceful shutdown. I am not clear on what > >> you mean by shutting down the task. > >> > >> I called the commit operation for the successful records. Should I > >> perform any other steps if I have an invalid record? > >> Please advise. > >> > >> Thanks, > >> Nitty > >> > >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton > >> wrote: > >> > >>> Hi Nitty, > >>> > >>> Thanks again for all the details here, especially the log messages. > >>> > >>> > The below mentioned issue is happening for Json connector only. Is > >>> there > >>> any difference with asn1,binary,csv and json connector? > >>> > >>> Can you clarify if the difference here is in the Connector/Task > classens, > >>> or if it's in the key/value converters that are configured for the > >>> connector? The key/value converters are configured using the > >>> "key.converter" and "value.converter" property and, if problems arise > >>> with > >>> them, the task will fail and, if it has a non-empty ongoing > transaction, > >>> that transaction will be automatically aborted since we close the > task's > >>> Kafka producer when it fails (or shuts down gracefully). > >>> > >>> With regards to these log messages: > >>> > >>> > org.apache.kafka.common.errors.ProducerFencedException: There is a > >>> newer > >>> producer with the same transactionalId which fences the current one. > >>> > >>> It looks like your tasks aren't shutting down gracefully in time, which > >>> causes them to be fenced out by the Connect framework later on. Do you > >>> see > >>> messages like "Graceful stop of task failed" in the logs > >>> for > >>> your Connect worker? > >>> > >>> Cheers, > >>> > >>> Chris > >>> > >>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY > >>> wrote: > >>> > >>> > Hi Chris, > >>> > > >>> > As you said, the below message is coming when I call an abort if > there > >>> is > >>> > an invalid record, then for the next transaction I can see the below > >>> > message and then the connector will be stopped. > >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] > >>> Aborting > >>> > transaction for batch as requested by connector > >>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > >>> > [task-thread-json-sftp-source-connector-0] > >>
Re: Exactly once kafka connect query
producer batches due to fatal error >>> > >> (org.apache.kafka.clients.producer.internals.Sender) >>> > >> [kafka-producer-network-thread | >>> > >> connector-producer-json-sftp-source-connector-0] >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >>> newer >>> > >> producer with the same transactionalId which fences the current one. >>> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed >>> to >>> > >> flush offsets to storage: >>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >>> > >> [kafka-producer-network-thread | >>> > >> connector-producer-json-sftp-source-connector-0] >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >>> newer >>> > >> producer with the same transactionalId which fences the current one. >>> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed >>> to >>> > send >>> > >> record to streams-input: >>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) >>> > >> [kafka-producer-network-thread | >>> > >> connector-producer-json-sftp-source-connector-0] >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >>> newer >>> > >> producer with the same transactionalId which fences the current one. >>> > >> 2023-03-12 11:32:45,222 ERROR >>> > [json-sftp-source-connector|task-0|offsets] >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed >>> to >>> > >> commit producer transaction >>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >>> > >> [task-thread-json-sftp-source-connector-0] >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >>> newer >>> > >> producer with the same transactionalId which fences the current one. >>> > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task >>> threw >>> > an >>> > >> uncaught and unrecoverable exception. Task is being killed and will >>> not >>> > >> recover until manually restarted >>> > >> (org.apache.kafka.connect.runtime.WorkerTask) >>> > >> [task-thread-json-sftp-source-connector-0] >>> > >> >>> > >> Do you know why it is showing an abort state even if I call commit? >>> > >> >>> > >> I tested one more scenario, When I call the commit I saw the below >>> > >> >>> > >>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*, >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), >>> > >> txnStartTimestamp=1678620463834, >>> txnLastUpdateTimestamp=1678620463834) >>> > >> Then, before changing the states to Abort, I dropped the next file >>> then >>> > I >>> > >> dont see any issues. Previous transaction >>> > >> as well as the current transaction are committed. >>> > >> >>> > >> Thank you for your support. >>> > >> >>> > >> Thanks, >>> > >> Nitty >>> > >> >>> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton >>> >>> > >> wrote: >>> > >> >>> > >>> Hi Nitty, >>> > >>> >>> > >>> > I called commitTransaction when I reach the first error record, >>> but >>> > >>> commit is not happening for me. Kafka connect tries to abort the >>> > >>> transaction automatically >>> > >>> >>> > >>> This is really interesting--are you certain that your task never >>> > invoked >>> > >>> TransactionContext::abortTransaction in this case? I'm
Re: Exactly once kafka connect query
nalId which fences the current one. >> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] >> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed >> to >> > send >> > >> record to streams-input: >> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) >> > >> [kafka-producer-network-thread | >> > >> connector-producer-json-sftp-source-connector-0] >> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >> newer >> > >> producer with the same transactionalId which fences the current one. >> > >> 2023-03-12 11:32:45,222 ERROR >> > [json-sftp-source-connector|task-0|offsets] >> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed >> to >> > >> commit producer transaction >> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >> > >> [task-thread-json-sftp-source-connector-0] >> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >> newer >> > >> producer with the same transactionalId which fences the current one. >> > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] >> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task >> threw >> > an >> > >> uncaught and unrecoverable exception. Task is being killed and will >> not >> > >> recover until manually restarted >> > >> (org.apache.kafka.connect.runtime.WorkerTask) >> > >> [task-thread-json-sftp-source-connector-0] >> > >> >> > >> Do you know why it is showing an abort state even if I call commit? >> > >> >> > >> I tested one more scenario, When I call the commit I saw the below >> > >> >> > >> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >> > >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*, >> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), >> > >> txnStartTimestamp=1678620463834, >> txnLastUpdateTimestamp=1678620463834) >> > >> Then, before changing the states to Abort, I dropped the next file >> then >> > I >> > >> dont see any issues. Previous transaction >> > >> as well as the current transaction are committed. >> > >> >> > >> Thank you for your support. >> > >> >> > >> Thanks, >> > >> Nitty >> > >> >> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton >> >> > >> wrote: >> > >> >> > >>> Hi Nitty, >> > >>> >> > >>> > I called commitTransaction when I reach the first error record, >> but >> > >>> commit is not happening for me. Kafka connect tries to abort the >> > >>> transaction automatically >> > >>> >> > >>> This is really interesting--are you certain that your task never >> > invoked >> > >>> TransactionContext::abortTransaction in this case? I'm looking over >> the >> > >>> code base and it seems fairly clear that the only thing that could >> > >>> trigger >> > >>> a call to KafkaProducer::abortTransaction is a request by the task >> to >> > >>> abort >> > >>> a transaction (either for a next batch, or for a specific record). >> It >> > may >> > >>> help to run the connector in a debugger and/or look for "Aborting >> > >>> transaction for batch as requested by connector" or "Aborting >> > transaction >> > >>> for record on topic as requested by connector" log >> > >>> messages (which will be emitted at INFO level by >> > >>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask >> class >> > if >> > >>> the task is requesting an abort). >> > >>> >> > >>> Regardless, I'll work on a fix for the bug with aborting empty >> > >>> transactions. Thanks for helping uncover that one! >> > >>> >> > >>> Cheers, >> > >>> >> > >>> Chris >> > >>> >> > >>> On Thu, Mar 9
Re: Exactly once kafka connect query
463834, txnLastUpdateTimestamp=1678620526119) > > >> > > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > > >> producerId=11, producerEpoch=3, txnTimeoutMs=6, > > state=*CompleteAbort*, > > >> pendingState=None, topicPartitions=HashSet(), > > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121) > > >> > > >> Later for the next transaction, when it returns the first batch below > is > > >> the logs I can see. > > >> > > >> Transiting to abortable error state due to > > >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > > >> attempted to produce with an old epoch. > > >> (org.apache.kafka.clients.producer.internals.TransactionManager) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0] > > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to > > send > > >> record to streams-input: > > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > > >> attempted to produce with an old epoch. > > >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] > > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, > > >> transactionalId=connect-cluster-json-sftp-source-connector-0] > > Transiting to > > >> fatal error state due to > > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > newer > > >> producer with the same transactionalId which fences the current one. > > >> (org.apache.kafka.clients.producer.internals.TransactionManager) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, > > >> transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting > > >> producer batches due to fatal error > > >> (org.apache.kafka.clients.producer.internals.Sender) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > newer > > >> producer with the same transactionalId which fences the current one. > > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to > > >> flush offsets to storage: > > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > newer > > >> producer with the same transactionalId which fences the current one. > > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] > > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to > > send > > >> record to streams-input: > > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > newer > > >> producer with the same transactionalId which fences the current one. > > >> 2023-03-12 11:32:45,222 ERROR > > [json-sftp-source-connector|task-0|offsets] > > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to > > >> commit producer transaction > > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > > >> [task-thread-json-sftp-source-connector-0] > > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > newer > > >> producer with the same transactionalId which fences the current one. > > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] > > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connect
Re: Exactly once kafka connect query
-thread | > >> connector-producer-json-sftp-source-connector-0] > >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0] > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to > send > >> record to streams-input: > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > >> attempted to produce with an old epoch. > >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, > >> transactionalId=connect-cluster-json-sftp-source-connector-0] > Transiting to > >> fatal error state due to > >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer > >> producer with the same transactionalId which fences the current one. > >> (org.apache.kafka.clients.producer.internals.TransactionManager) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, > >> transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting > >> producer batches due to fatal error > >> (org.apache.kafka.clients.producer.internals.Sender) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer > >> producer with the same transactionalId which fences the current one. > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to > >> flush offsets to storage: > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer > >> producer with the same transactionalId which fences the current one. > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to > send > >> record to streams-input: > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer > >> producer with the same transactionalId which fences the current one. > >> 2023-03-12 11:32:45,222 ERROR > [json-sftp-source-connector|task-0|offsets] > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to > >> commit producer transaction > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > >> [task-thread-json-sftp-source-connector-0] > >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer > >> producer with the same transactionalId which fences the current one. > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw > an > >> uncaught and unrecoverable exception. Task is being killed and will not > >> recover until manually restarted > >> (org.apache.kafka.connect.runtime.WorkerTask) > >> [task-thread-json-sftp-source-connector-0] > >> > >> Do you know why it is showing an abort state even if I call commit? > >> > >> I tested one more scenario, When I call the commit I saw the below > >> > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*, > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834) > >> Then, before changing the states to Abort, I dropped the next file then > I > >> dont see any issues. Previous transaction > >> as well as the current transaction are committed. > >> > >> Thank you for your support. > >> > >> Thanks, > >> Nitty > >> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton > >> wrote: > >> > &g
Re: Exactly once kafka connect query
| >> connector-producer-json-sftp-source-connector-0] >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer >> producer with the same transactionalId which fences the current one. >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to >> flush offsets to storage: >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >> [kafka-producer-network-thread | >> connector-producer-json-sftp-source-connector-0] >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer >> producer with the same transactionalId which fences the current one. >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send >> record to streams-input: >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) >> [kafka-producer-network-thread | >> connector-producer-json-sftp-source-connector-0] >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer >> producer with the same transactionalId which fences the current one. >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0|offsets] >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to >> commit producer transaction >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >> [task-thread-json-sftp-source-connector-0] >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer >> producer with the same transactionalId which fences the current one. >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw an >> uncaught and unrecoverable exception. Task is being killed and will not >> recover until manually restarted >> (org.apache.kafka.connect.runtime.WorkerTask) >> [task-thread-json-sftp-source-connector-0] >> >> Do you know why it is showing an abort state even if I call commit? >> >> I tested one more scenario, When I call the commit I saw the below >> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*, >> pendingState=None, topicPartitions=HashSet(streams-input-2), >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834) >> Then, before changing the states to Abort, I dropped the next file then I >> dont see any issues. Previous transaction >> as well as the current transaction are committed. >> >> Thank you for your support. >> >> Thanks, >> Nitty >> >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton >> wrote: >> >>> Hi Nitty, >>> >>> > I called commitTransaction when I reach the first error record, but >>> commit is not happening for me. Kafka connect tries to abort the >>> transaction automatically >>> >>> This is really interesting--are you certain that your task never invoked >>> TransactionContext::abortTransaction in this case? I'm looking over the >>> code base and it seems fairly clear that the only thing that could >>> trigger >>> a call to KafkaProducer::abortTransaction is a request by the task to >>> abort >>> a transaction (either for a next batch, or for a specific record). It may >>> help to run the connector in a debugger and/or look for "Aborting >>> transaction for batch as requested by connector" or "Aborting transaction >>> for record on topic as requested by connector" log >>> messages (which will be emitted at INFO level by >>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if >>> the task is requesting an abort). >>> >>> Regardless, I'll work on a fix for the bug with aborting empty >>> transactions. Thanks for helping uncover that one! >>> >>> Cheers, >>> >>> Chris >>> >>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY wrote: >>> >>> > Hi Chris, >>> > >>> > We have a use case to commit previous successful records and stop the >>> > processing of the current file and move on with the next file. To >>> achieve >>> > that I called commitTransaction when I reach the first error record, >>> but >>> > commit is not happening for me. Kafka connect tries to abort the >>> > transaction automatically, I checked the _transaction_state topic and >>> > states m
Re: Exactly once kafka connect query
-json-sftp-source-connector-0] > org.apache.kafka.common.errors.ProducerFencedException: There is a newer > producer with the same transactionalId which fences the current one. > 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] > ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw an > uncaught and unrecoverable exception. Task is being killed and will not > recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) > [task-thread-json-sftp-source-connector-0] > > Do you know why it is showing an abort state even if I call commit? > > I tested one more scenario, When I call the commit I saw the below > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*, > pendingState=None, topicPartitions=HashSet(streams-input-2), > txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834) > Then, before changing the states to Abort, I dropped the next file then I > dont see any issues. Previous transaction > as well as the current transaction are committed. > > Thank you for your support. > > Thanks, > Nitty > > On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton > wrote: > >> Hi Nitty, >> >> > I called commitTransaction when I reach the first error record, but >> commit is not happening for me. Kafka connect tries to abort the >> transaction automatically >> >> This is really interesting--are you certain that your task never invoked >> TransactionContext::abortTransaction in this case? I'm looking over the >> code base and it seems fairly clear that the only thing that could trigger >> a call to KafkaProducer::abortTransaction is a request by the task to >> abort >> a transaction (either for a next batch, or for a specific record). It may >> help to run the connector in a debugger and/or look for "Aborting >> transaction for batch as requested by connector" or "Aborting transaction >> for record on topic as requested by connector" log >> messages (which will be emitted at INFO level by >> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if >> the task is requesting an abort). >> >> Regardless, I'll work on a fix for the bug with aborting empty >> transactions. Thanks for helping uncover that one! >> >> Cheers, >> >> Chris >> >> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY wrote: >> >> > Hi Chris, >> > >> > We have a use case to commit previous successful records and stop the >> > processing of the current file and move on with the next file. To >> achieve >> > that I called commitTransaction when I reach the first error record, but >> > commit is not happening for me. Kafka connect tries to abort the >> > transaction automatically, I checked the _transaction_state topic and >> > states marked as PrepareAbort and CompleteAbort. Do you know why kafka >> > connect automatically invokes abort instead of the implicit commit I >> > called? >> > Then as a result, when I tries to parse the next file - say ABC, I saw >> the >> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent >> record to >> > topic", and we lost the first batch of records from the current >> transaction >> > in the file ABC. >> > >> > Is it possible that there's a case where an abort is being requested >> while >> > the current transaction is empty (i.e., the task hasn't returned any >> > records from SourceTask::poll since the last transaction was >> > committed/aborted)? --- Yes, that case is possible for us. There is a >> case >> > where the first record itself an error record. >> > >> > Thanks, >> > Nitty >> > >> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton >> > wrote: >> > >> > > Hi Nitty, >> > > >> > > Thanks for the code examples and the detailed explanations, this is >> > really >> > > helpful! >> > > >> > > > Say if I have a file with 5 records and batch size is 2, and in my >> 3rd >> > > batch I have one error record then in that batch, I dont have a valid >> > > record to call commit or abort. But I want to commit all the previous >> > > batches that were successfully parsed. How do I do that? >> > > >> > > An important thing to keep in mind with the TransactionContext API is >> > that &g
Re: Exactly once kafka connect query
the commit I saw the below connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*, pendingState=None, topicPartitions=HashSet(streams-input-2), txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834) Then, before changing the states to Abort, I dropped the next file then I dont see any issues. Previous transaction as well as the current transaction are committed. Thank you for your support. Thanks, Nitty On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton wrote: > Hi Nitty, > > > I called commitTransaction when I reach the first error record, but > commit is not happening for me. Kafka connect tries to abort the > transaction automatically > > This is really interesting--are you certain that your task never invoked > TransactionContext::abortTransaction in this case? I'm looking over the > code base and it seems fairly clear that the only thing that could trigger > a call to KafkaProducer::abortTransaction is a request by the task to abort > a transaction (either for a next batch, or for a specific record). It may > help to run the connector in a debugger and/or look for "Aborting > transaction for batch as requested by connector" or "Aborting transaction > for record on topic as requested by connector" log > messages (which will be emitted at INFO level by > the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if > the task is requesting an abort). > > Regardless, I'll work on a fix for the bug with aborting empty > transactions. Thanks for helping uncover that one! > > Cheers, > > Chris > > On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY wrote: > > > Hi Chris, > > > > We have a use case to commit previous successful records and stop the > > processing of the current file and move on with the next file. To achieve > > that I called commitTransaction when I reach the first error record, but > > commit is not happening for me. Kafka connect tries to abort the > > transaction automatically, I checked the _transaction_state topic and > > states marked as PrepareAbort and CompleteAbort. Do you know why kafka > > connect automatically invokes abort instead of the implicit commit I > > called? > > Then as a result, when I tries to parse the next file - say ABC, I saw > the > > logs "Aborting incomplete transaction" and ERROR: "Failed to sent record > to > > topic", and we lost the first batch of records from the current > transaction > > in the file ABC. > > > > Is it possible that there's a case where an abort is being requested > while > > the current transaction is empty (i.e., the task hasn't returned any > > records from SourceTask::poll since the last transaction was > > committed/aborted)? --- Yes, that case is possible for us. There is a > case > > where the first record itself an error record. > > > > Thanks, > > Nitty > > > > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton > > wrote: > > > > > Hi Nitty, > > > > > > Thanks for the code examples and the detailed explanations, this is > > really > > > helpful! > > > > > > > Say if I have a file with 5 records and batch size is 2, and in my > 3rd > > > batch I have one error record then in that batch, I dont have a valid > > > record to call commit or abort. But I want to commit all the previous > > > batches that were successfully parsed. How do I do that? > > > > > > An important thing to keep in mind with the TransactionContext API is > > that > > > all records that a task returns from SourceTask::poll are implicitly > > > included in a transaction. Invoking > SourceTaskContext::transactionContext > > > doesn't alter this or cause transactions to start being used; > everything > > is > > > already in a transaction, and the Connect runtime automatically begins > > > transactions for any records it sees from the task if it hasn't already > > > begun one. It's also valid to return a null or empty list of records > from > > > SourceTask::poll. So in this case, you can invoke > > > transactionContext.commitTransaction() (the no-args variant) and return > > an > > > empty batch from SourceTask::poll, which will cause the transaction > > > containing the 4 valid records that were returned in the last 2 batches > > to > > > be committed. > > > > > > FWIW, I would be a little cautious about this approach. Many times it's > > > better to fai
Re: Exactly once kafka connect query
Hi Nitty, > I called commitTransaction when I reach the first error record, but commit is not happening for me. Kafka connect tries to abort the transaction automatically This is really interesting--are you certain that your task never invoked TransactionContext::abortTransaction in this case? I'm looking over the code base and it seems fairly clear that the only thing that could trigger a call to KafkaProducer::abortTransaction is a request by the task to abort a transaction (either for a next batch, or for a specific record). It may help to run the connector in a debugger and/or look for "Aborting transaction for batch as requested by connector" or "Aborting transaction for record on topic as requested by connector" log messages (which will be emitted at INFO level by the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if the task is requesting an abort). Regardless, I'll work on a fix for the bug with aborting empty transactions. Thanks for helping uncover that one! Cheers, Chris On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY wrote: > Hi Chris, > > We have a use case to commit previous successful records and stop the > processing of the current file and move on with the next file. To achieve > that I called commitTransaction when I reach the first error record, but > commit is not happening for me. Kafka connect tries to abort the > transaction automatically, I checked the _transaction_state topic and > states marked as PrepareAbort and CompleteAbort. Do you know why kafka > connect automatically invokes abort instead of the implicit commit I > called? > Then as a result, when I tries to parse the next file - say ABC, I saw the > logs "Aborting incomplete transaction" and ERROR: "Failed to sent record to > topic", and we lost the first batch of records from the current transaction > in the file ABC. > > Is it possible that there's a case where an abort is being requested while > the current transaction is empty (i.e., the task hasn't returned any > records from SourceTask::poll since the last transaction was > committed/aborted)? --- Yes, that case is possible for us. There is a case > where the first record itself an error record. > > Thanks, > Nitty > > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton > wrote: > > > Hi Nitty, > > > > Thanks for the code examples and the detailed explanations, this is > really > > helpful! > > > > > Say if I have a file with 5 records and batch size is 2, and in my 3rd > > batch I have one error record then in that batch, I dont have a valid > > record to call commit or abort. But I want to commit all the previous > > batches that were successfully parsed. How do I do that? > > > > An important thing to keep in mind with the TransactionContext API is > that > > all records that a task returns from SourceTask::poll are implicitly > > included in a transaction. Invoking SourceTaskContext::transactionContext > > doesn't alter this or cause transactions to start being used; everything > is > > already in a transaction, and the Connect runtime automatically begins > > transactions for any records it sees from the task if it hasn't already > > begun one. It's also valid to return a null or empty list of records from > > SourceTask::poll. So in this case, you can invoke > > transactionContext.commitTransaction() (the no-args variant) and return > an > > empty batch from SourceTask::poll, which will cause the transaction > > containing the 4 valid records that were returned in the last 2 batches > to > > be committed. > > > > FWIW, I would be a little cautious about this approach. Many times it's > > better to fail fast on invalid data; it might be worth it to at least > allow > > users to configure whether the connector fails on invalid data, or > silently > > skips over it (which is what happens when transactions are aborted). > > > > > Why is abort not working without adding the last record to the list? > > > > Is it possible that there's a case where an abort is being requested > while > > the current transaction is empty (i.e., the task hasn't returned any > > records from SourceTask::poll since the last transaction was > > committed/aborted)? I think this may be a bug in the Connect framework > > where we don't check to see if a transaction is already open when a task > > requests that a transaction be aborted, which can cause tasks to fail > (see > > https://issues.apache.org/jira/browse/KAFKA-14799 for more details). > > > > Cheers, > > > > Chris > > > > > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY wrote:
Re: Exactly once kafka connect query
Hi Chris, We have a use case to commit previous successful records and stop the processing of the current file and move on with the next file. To achieve that I called commitTransaction when I reach the first error record, but commit is not happening for me. Kafka connect tries to abort the transaction automatically, I checked the _transaction_state topic and states marked as PrepareAbort and CompleteAbort. Do you know why kafka connect automatically invokes abort instead of the implicit commit I called? Then as a result, when I tries to parse the next file - say ABC, I saw the logs "Aborting incomplete transaction" and ERROR: "Failed to sent record to topic", and we lost the first batch of records from the current transaction in the file ABC. Is it possible that there's a case where an abort is being requested while the current transaction is empty (i.e., the task hasn't returned any records from SourceTask::poll since the last transaction was committed/aborted)? --- Yes, that case is possible for us. There is a case where the first record itself an error record. Thanks, Nitty On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton wrote: > Hi Nitty, > > Thanks for the code examples and the detailed explanations, this is really > helpful! > > > Say if I have a file with 5 records and batch size is 2, and in my 3rd > batch I have one error record then in that batch, I dont have a valid > record to call commit or abort. But I want to commit all the previous > batches that were successfully parsed. How do I do that? > > An important thing to keep in mind with the TransactionContext API is that > all records that a task returns from SourceTask::poll are implicitly > included in a transaction. Invoking SourceTaskContext::transactionContext > doesn't alter this or cause transactions to start being used; everything is > already in a transaction, and the Connect runtime automatically begins > transactions for any records it sees from the task if it hasn't already > begun one. It's also valid to return a null or empty list of records from > SourceTask::poll. So in this case, you can invoke > transactionContext.commitTransaction() (the no-args variant) and return an > empty batch from SourceTask::poll, which will cause the transaction > containing the 4 valid records that were returned in the last 2 batches to > be committed. > > FWIW, I would be a little cautious about this approach. Many times it's > better to fail fast on invalid data; it might be worth it to at least allow > users to configure whether the connector fails on invalid data, or silently > skips over it (which is what happens when transactions are aborted). > > > Why is abort not working without adding the last record to the list? > > Is it possible that there's a case where an abort is being requested while > the current transaction is empty (i.e., the task hasn't returned any > records from SourceTask::poll since the last transaction was > committed/aborted)? I think this may be a bug in the Connect framework > where we don't check to see if a transaction is already open when a task > requests that a transaction be aborted, which can cause tasks to fail (see > https://issues.apache.org/jira/browse/KAFKA-14799 for more details). > > Cheers, > > Chris > > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY wrote: > > > Hi Chris, > > > > I am not sure if you are able to see the images I shared with you . > > Copying the code snippet below, > > > > if (expectedRecordCount >= 0) { > > int missingCount = expectedRecordCount - (int) this. > > recordOffset() - 1; > > if (missingCount > 0) { > > if (transactionContext != null) { > > isMissedRecords = true; > > } else { > > throw new DataException(String.format("Missing %d records > > (expecting %d, actual %d)", missingCount, expectedRecordCount, this. > > recordOffset())); > > } > > } else if (missingCount < 0) { > > if (transactionContext != null) { > > isMissedRecords = true; > > } else { > > throw new DataException(String.format("Too many records > > (expecting %d, actual %d)", expectedRecordCount, this.recordOffset())); > > } > > } > > } > > addLastRecord(records, null, value); > > } > > > > > > > > //asn1 or binary abort > > if((config.parseErrorThreshold != null && parseErrorCount >= > > config.parseErrorThreshold > > && lastbatch && transactionC
Re: Exactly once kafka connect query
hen in that batch, I dont have a valid record to call commit or abort. But >> I want to commit all the previous batches that were successfully parsed. >> How do I do that? >> >> Second use case is where I want to abort a transaction if the record >> count doesn't match. >> Code Snippet : >> [image: image.png] >> There are no error records in this case. If you see I added the condition >> of transactionContext check to implement exactly once, without >> transaction it was just throwing the exception without calling the >> addLastRecord() method and in the catch block it just logs the message and >> return the list of records without the last record to poll().To make it >> work, I called the method addLastRecord() in this case, so it is not >> throwing the exception and list has last record as well. Then I called the >> abort, everything got aborted. Why is abort not working without adding the >> last record to the list? >> [image: image.png] >> >> Code to call abort. >> >> >> >> >> Thanks, >> Nitty >> >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton >> wrote: >> >>> Hi Nitty, >>> >>> I'm a little confused about what you mean by this part: >>> >>> > transaction is not getting completed because it is not commiting the >>> transaction offest. >>> >>> The only conditions required for a transaction to be completed when a >>> connector is defining its own transaction boundaries are: >>> >>> 1. The task requests a transaction commit/abort from the >>> TransactionContext >>> 2. The task returns a batch of records from SourceTask::poll (and, if >>> using >>> the per-record API provided by the TransactionContext class, includes at >>> least one record that should trigger a transaction commit/abort in that >>> batch) >>> >>> The Connect runtime should automatically commit source offsets to Kafka >>> whenever a transaction is completed, either by commit or abort. This is >>> because transactions should only be aborted for data that should never be >>> re-read by the connector; if there is a validation error that should be >>> handled by reconfiguring the connector, then the task should throw an >>> exception instead of aborting the transaction. >>> >>> If possible, do you think you could provide a brief code snippet >>> illustrating what your task is doing that's causing issues? >>> >>> Cheers, >>> >>> Chris (not Chrise ) >>> >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY >>> wrote: >>> >>> > Hi Chrise, >>> > >>> > Thanks for sharing the details. >>> > >>> > Regarding the use case, For Asn1 source connector we have a use case to >>> > validate number of records in the file with the number of records in >>> the >>> > header. So currently, if validation fails we are not sending the last >>> > record to the topic. But after introducing exactly once with connector >>> > transaction boundary, I can see that if I call an abort when the >>> validation >>> > fails, transaction is not getting completed because it is not >>> commiting the >>> > transaction offest. I saw that transaction state changed to >>> CompleteAbort. >>> > So for my next transaction I am getting InvalidProducerEpochException >>> and >>> > then task stopped after that. I tried calling the abort after sending >>> last >>> > record to the topic then transaction getting completed. >>> > >>> > I dont know if I am doing anything wrong here. >>> > >>> > Please advise. >>> > Thanks, >>> > Nitty >>> > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton >> > >>> > wrote: >>> > >>> > > Hi Nitty, >>> > > >>> > > We've recently added some documentation on implementing exactly-once >>> > source >>> > > connectors here: >>> > > >>> > >>> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors >>> > > . >>> > > To quote a relevant passage from those docs: >>> > > >>> > > > In order for a source connector to take advantage of this support, >>> it >>> > > must be able to provide meaningful source offsets for each record >>&g
Re: Exactly once kafka connect query
ould provide a brief code snippet >> illustrating what your task is doing that's causing issues? >> >> Cheers, >> >> Chris (not Chrise ) >> >> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY wrote: >> >> > Hi Chrise, >> > >> > Thanks for sharing the details. >> > >> > Regarding the use case, For Asn1 source connector we have a use case to >> > validate number of records in the file with the number of records in the >> > header. So currently, if validation fails we are not sending the last >> > record to the topic. But after introducing exactly once with connector >> > transaction boundary, I can see that if I call an abort when the >> validation >> > fails, transaction is not getting completed because it is not commiting >> the >> > transaction offest. I saw that transaction state changed to >> CompleteAbort. >> > So for my next transaction I am getting InvalidProducerEpochException >> and >> > then task stopped after that. I tried calling the abort after sending >> last >> > record to the topic then transaction getting completed. >> > >> > I dont know if I am doing anything wrong here. >> > >> > Please advise. >> > Thanks, >> > Nitty >> > >> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton >> > wrote: >> > >> > > Hi Nitty, >> > > >> > > We've recently added some documentation on implementing exactly-once >> > source >> > > connectors here: >> > > >> > >> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors >> > > . >> > > To quote a relevant passage from those docs: >> > > >> > > > In order for a source connector to take advantage of this support, >> it >> > > must be able to provide meaningful source offsets for each record >> that it >> > > emits, and resume consumption from the external system at the exact >> > > position corresponding to any of those offsets without dropping or >> > > duplicating messages. >> > > >> > > So, as long as your source connector is able to use the Kafka Connect >> > > framework's offsets API correctly, it shouldn't be necessary to make >> any >> > > other code changes to the connector. >> > > >> > > To enable exactly-once support for source connectors on your Connect >> > > cluster, see the docs section here: >> > > https://kafka.apache.org/documentation/#connect_exactlyoncesource >> > > >> > > With regard to transactions, a transactional producer is always >> created >> > > automatically for your connector by the Connect runtime when >> exactly-once >> > > support is enabled on the worker. The only reason to set >> > > "transaction.boundary" to "connector" is if your connector would like >> to >> > > explicitly define its own transaction boundaries. In this case, it >> sounds >> > > like may be what you want; I just want to make sure to call out that >> in >> > > either case, you should not be directly instantiating a producer in >> your >> > > connector code, but let the Kafka Connect runtime do that for you, and >> > just >> > > worry about returning the right records from SourceTask::poll (and >> > possibly >> > > defining custom transactions using the TransactionContext API). >> > > >> > > With respect to your question about committing or aborting in certain >> > > circumstances, it'd be useful to know more about your use case, since >> it >> > > may not be necessary to define custom transaction boundaries in your >> > > connector at all. >> > > >> > > Cheers, >> > > >> > > Chris >> > > >> > > >> > > >> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY >> wrote: >> > > >> > > > Hi Team, >> > > > >> > > > Adding on top of this, I tried creating a TransactionContext object >> and >> > > > calling the commitTransaction and abortTranaction methods in source >> > > > connectors. >> > > > But the main problem I saw is that if there is any error while >> parsing >> > > the >> > > > record, connect is calling an abort but we have a use case to call >> > commit >> > > > in some cases. Is it a valid use case in terms of kafka connect? >> > > > >> > > > Another Question - Should I use a transactional producer instead >> > > > creating an object of TransactionContext? Below is the connector >> > > > configuration I am using. >> > > > >> > > > exactly.once.support: "required" >> > > > transaction.boundary: "connector" >> > > > >> > > > Could you please help me here? >> > > > >> > > > Thanks, >> > > > Nitty >> > > > >> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY >> > > wrote: >> > > > >> > > > > Hi Team, >> > > > > I am trying to implement exactly once behavior in our source >> > connector. >> > > > Is >> > > > > there any sample source connector implementation available to >> have a >> > > look >> > > > > at? >> > > > > Regards, >> > > > > Nitty >> > > > > >> > > > >> > > >> > >> >
Re: Exactly once kafka connect query
Hi Chris, Sorry for the typo in my previous email. Regarding the point 2,* the task returns a batch of records from SourceTask::poll (and, if using* *the per-record API provided by the TransactionContext class, includes atleast one record that should trigger a transaction commit/abort in thatbatch)* What if I am using the API without passing a record? We have 2 types of use cases, one where on encountering an error record, we want to commit previous successful batches and disregard the failed record and upcoming batches. In this case we created the transactionContext just before reading the file (file is our transaction boundary).Say if I have a file with 5 records and batch size is 2, and in my 3rd batch I have one error record then in that batch, I dont have a valid record to call commit or abort. But I want to commit all the previous batches that were successfully parsed. How do I do that? Second use case is where I want to abort a transaction if the record count doesn't match. Code Snippet : [image: image.png] There are no error records in this case. If you see I added the condition of transactionContext check to implement exactly once, without transaction it was just throwing the exception without calling the addLastRecord() method and in the catch block it just logs the message and return the list of records without the last record to poll().To make it work, I called the method addLastRecord() in this case, so it is not throwing the exception and list has last record as well. Then I called the abort, everything got aborted. Why is abort not working without adding the last record to the list? [image: image.png] Code to call abort. Thanks, Nitty On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton wrote: > Hi Nitty, > > I'm a little confused about what you mean by this part: > > > transaction is not getting completed because it is not commiting the > transaction offest. > > The only conditions required for a transaction to be completed when a > connector is defining its own transaction boundaries are: > > 1. The task requests a transaction commit/abort from the TransactionContext > 2. The task returns a batch of records from SourceTask::poll (and, if using > the per-record API provided by the TransactionContext class, includes at > least one record that should trigger a transaction commit/abort in that > batch) > > The Connect runtime should automatically commit source offsets to Kafka > whenever a transaction is completed, either by commit or abort. This is > because transactions should only be aborted for data that should never be > re-read by the connector; if there is a validation error that should be > handled by reconfiguring the connector, then the task should throw an > exception instead of aborting the transaction. > > If possible, do you think you could provide a brief code snippet > illustrating what your task is doing that's causing issues? > > Cheers, > > Chris (not Chrise ) > > On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY wrote: > > > Hi Chrise, > > > > Thanks for sharing the details. > > > > Regarding the use case, For Asn1 source connector we have a use case to > > validate number of records in the file with the number of records in the > > header. So currently, if validation fails we are not sending the last > > record to the topic. But after introducing exactly once with connector > > transaction boundary, I can see that if I call an abort when the > validation > > fails, transaction is not getting completed because it is not commiting > the > > transaction offest. I saw that transaction state changed to > CompleteAbort. > > So for my next transaction I am getting InvalidProducerEpochException and > > then task stopped after that. I tried calling the abort after sending > last > > record to the topic then transaction getting completed. > > > > I dont know if I am doing anything wrong here. > > > > Please advise. > > Thanks, > > Nitty > > > > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton > > wrote: > > > > > Hi Nitty, > > > > > > We've recently added some documentation on implementing exactly-once > > source > > > connectors here: > > > > > > https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors > > > . > > > To quote a relevant passage from those docs: > > > > > > > In order for a source connector to take advantage of this support, it > > > must be able to provide meaningful source offsets for each record that > it > > > emits, and resume consumption from the external system at the exact > > > position corresponding to any of those offsets without dropping or > > > du
Re: Exactly once kafka connect query
Hi Nitty, I'm a little confused about what you mean by this part: > transaction is not getting completed because it is not commiting the transaction offest. The only conditions required for a transaction to be completed when a connector is defining its own transaction boundaries are: 1. The task requests a transaction commit/abort from the TransactionContext 2. The task returns a batch of records from SourceTask::poll (and, if using the per-record API provided by the TransactionContext class, includes at least one record that should trigger a transaction commit/abort in that batch) The Connect runtime should automatically commit source offsets to Kafka whenever a transaction is completed, either by commit or abort. This is because transactions should only be aborted for data that should never be re-read by the connector; if there is a validation error that should be handled by reconfiguring the connector, then the task should throw an exception instead of aborting the transaction. If possible, do you think you could provide a brief code snippet illustrating what your task is doing that's causing issues? Cheers, Chris (not Chrise ) On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY wrote: > Hi Chrise, > > Thanks for sharing the details. > > Regarding the use case, For Asn1 source connector we have a use case to > validate number of records in the file with the number of records in the > header. So currently, if validation fails we are not sending the last > record to the topic. But after introducing exactly once with connector > transaction boundary, I can see that if I call an abort when the validation > fails, transaction is not getting completed because it is not commiting the > transaction offest. I saw that transaction state changed to CompleteAbort. > So for my next transaction I am getting InvalidProducerEpochException and > then task stopped after that. I tried calling the abort after sending last > record to the topic then transaction getting completed. > > I dont know if I am doing anything wrong here. > > Please advise. > Thanks, > Nitty > > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton > wrote: > > > Hi Nitty, > > > > We've recently added some documentation on implementing exactly-once > source > > connectors here: > > > https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors > > . > > To quote a relevant passage from those docs: > > > > > In order for a source connector to take advantage of this support, it > > must be able to provide meaningful source offsets for each record that it > > emits, and resume consumption from the external system at the exact > > position corresponding to any of those offsets without dropping or > > duplicating messages. > > > > So, as long as your source connector is able to use the Kafka Connect > > framework's offsets API correctly, it shouldn't be necessary to make any > > other code changes to the connector. > > > > To enable exactly-once support for source connectors on your Connect > > cluster, see the docs section here: > > https://kafka.apache.org/documentation/#connect_exactlyoncesource > > > > With regard to transactions, a transactional producer is always created > > automatically for your connector by the Connect runtime when exactly-once > > support is enabled on the worker. The only reason to set > > "transaction.boundary" to "connector" is if your connector would like to > > explicitly define its own transaction boundaries. In this case, it sounds > > like may be what you want; I just want to make sure to call out that in > > either case, you should not be directly instantiating a producer in your > > connector code, but let the Kafka Connect runtime do that for you, and > just > > worry about returning the right records from SourceTask::poll (and > possibly > > defining custom transactions using the TransactionContext API). > > > > With respect to your question about committing or aborting in certain > > circumstances, it'd be useful to know more about your use case, since it > > may not be necessary to define custom transaction boundaries in your > > connector at all. > > > > Cheers, > > > > Chris > > > > > > > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY wrote: > > > > > Hi Team, > > > > > > Adding on top of this, I tried creating a TransactionContext object and > > > calling the commitTransaction and abortTranaction methods in source > > > connectors. > > > But the main problem I saw is that if there is any error while parsing > > the > > > record, connect is cal
Fwd: Exactly once kafka connect query
Hi Chrise, Thanks for sharing the details. Regarding the use case, For Asn1 source connector we have a use case to validate number of records in the file with the number of records in the header. So currently, if validation fails we are not sending the last record to the topic. But after introducing exactly once with connector transaction boundary, I can see that if I call an abort when the validation fails, transaction is not getting completed because it is not commiting the transaction offest. I saw that transaction state changed to CompleteAbort. So for my next transaction I am getting InvalidProducerEpochException and then task stopped after that. I tried calling the abort after sending last record to the topic then transaction getting completed. I dont know if I am doing anything wrong here. Please advise. Thanks, Nitty On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton wrote: > Hi Nitty, > > We've recently added some documentation on implementing exactly-once source > connectors here: > https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors > . > To quote a relevant passage from those docs: > > > In order for a source connector to take advantage of this support, it > must be able to provide meaningful source offsets for each record that it > emits, and resume consumption from the external system at the exact > position corresponding to any of those offsets without dropping or > duplicating messages. > > So, as long as your source connector is able to use the Kafka Connect > framework's offsets API correctly, it shouldn't be necessary to make any > other code changes to the connector. > > To enable exactly-once support for source connectors on your Connect > cluster, see the docs section here: > https://kafka.apache.org/documentation/#connect_exactlyoncesource > > With regard to transactions, a transactional producer is always created > automatically for your connector by the Connect runtime when exactly-once > support is enabled on the worker. The only reason to set > "transaction.boundary" to "connector" is if your connector would like to > explicitly define its own transaction boundaries. In this case, it sounds > like may be what you want; I just want to make sure to call out that in > either case, you should not be directly instantiating a producer in your > connector code, but let the Kafka Connect runtime do that for you, and just > worry about returning the right records from SourceTask::poll (and possibly > defining custom transactions using the TransactionContext API). > > With respect to your question about committing or aborting in certain > circumstances, it'd be useful to know more about your use case, since it > may not be necessary to define custom transaction boundaries in your > connector at all. > > Cheers, > > Chris > > > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY wrote: > > > Hi Team, > > > > Adding on top of this, I tried creating a TransactionContext object and > > calling the commitTransaction and abortTranaction methods in source > > connectors. > > But the main problem I saw is that if there is any error while parsing > the > > record, connect is calling an abort but we have a use case to call commit > > in some cases. Is it a valid use case in terms of kafka connect? > > > > Another Question - Should I use a transactional producer instead > > creating an object of TransactionContext? Below is the connector > > configuration I am using. > > > > exactly.once.support: "required" > > transaction.boundary: "connector" > > > > Could you please help me here? > > > > Thanks, > > Nitty > > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY > wrote: > > > > > Hi Team, > > > I am trying to implement exactly once behavior in our source connector. > > Is > > > there any sample source connector implementation available to have a > look > > > at? > > > Regards, > > > Nitty > > > > > >
Re: Exactly once kafka connect query
Hi Chrise, Thanks for sharing the details. Regarding the use case, For Asn1 source connector we have a use case to validate number of records in the file with the number of records in the header. So currently, if validation fails we are not sending the last record to the topic. But after introducing exactly once with connector transaction boundary, I can see that if I call an abort when the validation fails, transaction is not getting completed because it is not commiting the transaction offest. I saw that transaction state changed to CompleteAbort. So for my next transaction I am getting InvalidProducerEpochException and then task stopped after that. I tried calling the abort after sending last record to the topic then transaction getting completed. I dont know if I am doing anything wrong here. Please advise. Thanks, Nitty On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton wrote: > Hi Nitty, > > We've recently added some documentation on implementing exactly-once source > connectors here: > https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors > . > To quote a relevant passage from those docs: > > > In order for a source connector to take advantage of this support, it > must be able to provide meaningful source offsets for each record that it > emits, and resume consumption from the external system at the exact > position corresponding to any of those offsets without dropping or > duplicating messages. > > So, as long as your source connector is able to use the Kafka Connect > framework's offsets API correctly, it shouldn't be necessary to make any > other code changes to the connector. > > To enable exactly-once support for source connectors on your Connect > cluster, see the docs section here: > https://kafka.apache.org/documentation/#connect_exactlyoncesource > > With regard to transactions, a transactional producer is always created > automatically for your connector by the Connect runtime when exactly-once > support is enabled on the worker. The only reason to set > "transaction.boundary" to "connector" is if your connector would like to > explicitly define its own transaction boundaries. In this case, it sounds > like may be what you want; I just want to make sure to call out that in > either case, you should not be directly instantiating a producer in your > connector code, but let the Kafka Connect runtime do that for you, and just > worry about returning the right records from SourceTask::poll (and possibly > defining custom transactions using the TransactionContext API). > > With respect to your question about committing or aborting in certain > circumstances, it'd be useful to know more about your use case, since it > may not be necessary to define custom transaction boundaries in your > connector at all. > > Cheers, > > Chris > > > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY wrote: > > > Hi Team, > > > > Adding on top of this, I tried creating a TransactionContext object and > > calling the commitTransaction and abortTranaction methods in source > > connectors. > > But the main problem I saw is that if there is any error while parsing > the > > record, connect is calling an abort but we have a use case to call commit > > in some cases. Is it a valid use case in terms of kafka connect? > > > > Another Question - Should I use a transactional producer instead > > creating an object of TransactionContext? Below is the connector > > configuration I am using. > > > > exactly.once.support: "required" > > transaction.boundary: "connector" > > > > Could you please help me here? > > > > Thanks, > > Nitty > > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY > wrote: > > > > > Hi Team, > > > I am trying to implement exactly once behavior in our source connector. > > Is > > > there any sample source connector implementation available to have a > look > > > at? > > > Regards, > > > Nitty > > > > > >
Re: Exactly once kafka connect query
Hi Nitty, We've recently added some documentation on implementing exactly-once source connectors here: https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors. To quote a relevant passage from those docs: > In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages. So, as long as your source connector is able to use the Kafka Connect framework's offsets API correctly, it shouldn't be necessary to make any other code changes to the connector. To enable exactly-once support for source connectors on your Connect cluster, see the docs section here: https://kafka.apache.org/documentation/#connect_exactlyoncesource With regard to transactions, a transactional producer is always created automatically for your connector by the Connect runtime when exactly-once support is enabled on the worker. The only reason to set "transaction.boundary" to "connector" is if your connector would like to explicitly define its own transaction boundaries. In this case, it sounds like may be what you want; I just want to make sure to call out that in either case, you should not be directly instantiating a producer in your connector code, but let the Kafka Connect runtime do that for you, and just worry about returning the right records from SourceTask::poll (and possibly defining custom transactions using the TransactionContext API). With respect to your question about committing or aborting in certain circumstances, it'd be useful to know more about your use case, since it may not be necessary to define custom transaction boundaries in your connector at all. Cheers, Chris On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY wrote: > Hi Team, > > Adding on top of this, I tried creating a TransactionContext object and > calling the commitTransaction and abortTranaction methods in source > connectors. > But the main problem I saw is that if there is any error while parsing the > record, connect is calling an abort but we have a use case to call commit > in some cases. Is it a valid use case in terms of kafka connect? > > Another Question - Should I use a transactional producer instead > creating an object of TransactionContext? Below is the connector > configuration I am using. > > exactly.once.support: "required" > transaction.boundary: "connector" > > Could you please help me here? > > Thanks, > Nitty > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY wrote: > > > Hi Team, > > I am trying to implement exactly once behavior in our source connector. > Is > > there any sample source connector implementation available to have a look > > at? > > Regards, > > Nitty > > >
Fwd: Exactly once kafka connect query
Hi Team, Adding on top of this, I tried creating a TransactionContext object and calling the commitTransaction and abortTranaction methods in source connectors. But the main problem I saw is that if there is any error while parsing the record, connect is calling an abort but we have a use case to call commit in some cases. Is it a valid use case in terms of kafka connect? Another Question - Should I use a transactional producer instead creating an object of TransactionContext? Below is the connector configuration I am using. exactly.once.support: "required" transaction.boundary: "connector" Could you please help me here? Thanks, Nitty On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY wrote: > Hi Team, > I am trying to implement exactly once behavior in our source connector. Is > there any sample source connector implementation available to have a look > at? > Regards, > Nitty >
Exactly once kafka connect query
Hi Team, I am trying to implement exactly once behavior in our source connector. Is there any sample source connector implementation available to have a look at? Regards, Nitty
Kafka Connect issue | Debezium POC
Hi All, I am working on Debezium POC. We have a zookeeper, Kafka broker and kafka connect service. As per the logs the debezium connector is working fine. But kafka topics are not created automatically(auto topic creation enabled), except a few default topics ex: app_webapp.persona, app_webapp.role. In the Connector logs i could see below INFO. Please help me understand if there is any issue at kafka end. [2023-02-16 20:48:15,290] INFO [Consumer clientId=consumer-connect-cluster-3, groupId=connect-cluster] Group coordinator vmclxcfgascl n01.corp.equinix.com:9092 (id: 2147483646 rack: null)* is unavailable or invalid due to cause: coordinator unavailable.isDisconnected:* false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:942) [2023-02-16 20:48:15,290] INFO [Consumer clientId=consumer-connect-cluster-3, groupId=connect-cluster] Requesting disconnect from las t known coordinator vmclxcfgascln01.corp.equinix.com:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.Co nsumerCoordinator:955) [2023-02-16 20:48:15,392] INFO [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Discovered group coordinator v mclxcfgascln01.corp.equinix.com:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:879 ) -- Thanks and Regards, Hari Mobile:9790756568
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Ah, it definitely seems like KIP-710 will address the issue we've been bitten by most.We'll eagerly await the kafka-3.5.0 release and then see if enabling 'dedicated.mode.enable.internal.rest' is possible with Strimzi. Thanks for the help and patience! :-)
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Frank, > I don't think forcing the API users to introduce the nonce is desirable. I agree. That is why the nonce is a workaround, and not a proper solution. It's something to alleviate the symptoms in the short-term until a bugfix & upgrade can fix it. > Have you had any ideas on how this can be implemented within Kafka Connect itself so that it works as expected for all users? I have not looked into solutions in enough depth to recommend one. If I had, the PR would be open :) > We tried adding tasks to trigger a propagation of the task configs (increased from 36 to 40 tasks) however that did not unblock it.So triggering this code path did not seem to work: You may be affected by _another_ bug which is preventing tasks from being reconfigured which is specific to MM2: https://issues.apache.org/jira/browse/KAFKA-10586 You can see evidence for this in the DistributedHerder ERROR log "Request to leader to reconfigure connector tasks failed". A fix for this is in-flight already. It appears that Strimzi is using the kafka-mirror-maker.sh script, which is affected: https://github.com/strimzi/strimzi-kafka-operator/blob/97b48461d724a9c59505a9ad31b3d184476a83d7/docker-images/kafka-based/kafka/scripts/kafka_mirror_maker_run.sh#L121 > Are there any other (not overly-verbose) classes you recommend we enable DEBUG logging on I think you've covered the interesting ones. You can also look and see if the Mirror*Connector classes are behaving themselves, but it doesn't appear that the reconfiguration code path has any relevant logs. > Also, would making the inconsistent connectors (assuming they're being identified as such by Kafka Connect when this happens) through an API call also make sense so that this can be detected/monitored more easily? Unless we have evidence that the config topic being in inconsistent state (B) as a common problem, I don't think adding monitorability for it has a high enough ROI to be implemented. If you feel strongly about it, then you can consider opening a KIP to describe the public interface changes and how those interfaces would be used for monitoring. Inconsistent state (A) however, seems very common. I've seen it in production myself, it's implicated in KAFKA-9228, KAFKA-10586, and is clearly causing disturbance to real users. Fixing the conditions which lead to state (A) is what I'm most interested in seeing, and should be prioritized first because it's what is going to have the highest ROI. Right now you can find connectors in inconsistent state (A) with the following: * You can hand-inspect the task configs with the `GET /{connector}/tasks-config` endpoint since 2.8.0. This does not work for dedicated MM2 (right now) for precisely the same reason that KAFKA-10586 occurs: the REST API isn't running. * For mechanical alerting, you can read the config topic and track the offset for the most recent connector config and compare it with the offset for the most recent task config. This depends on internal implementation though, and isn't supported across releases. I think a way of detecting state (A) via the REST API would be a valuable addition that could get accepted, if someone is willing to do the legwork to design, propose and implement it. It would be valuable even without any bugs present, as the connectors have to transit through state (A) on each reconfiguration. We can look into this after getting some tactical fixes in place to avoid the long-term state (A). Thanks, Greg Harris On Wed, Feb 15, 2023 at 9:34 AM Frank Grimes wrote: > So we've just hit this issue again just with the MM2 connector and trying > to add a new mirrored topic.We're running MirrorMaker 2 in Strimzi. i.e. > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector"We have 6 worker > nodes.We changed the config to add a new mirror topic. i.e. append a new > topic to the MirrorSourceConnector's "topics" config.The MM2 config topic > reflects the change, as does viewing the config using Kowl UI.However, no > tasks run to mirror the newly-added topic.We also do not see any updates on > the MM2 status topic for the mirroring of that newly-added topic. > We tried adding tasks to trigger a propagation of the task configs > (increased from 36 to 40 tasks) however that did not unblock it.So > triggering this code path did not seem to work: > https://github.com/apache/kafka/blob/8cb0a5e9d3441962896b79163d141607e94d9b54/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1569-L1572 > Only restarting the workers seemed to unblock the propagation of the new > task config for the new mirrored topic. > Hopefully this can help us narrow things down a bit... > In the meantime we've since enabled the following DEBUG logging in > production to try to get more h
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
So we've just hit this issue again just with the MM2 connector and trying to add a new mirrored topic.We're running MirrorMaker 2 in Strimzi. i.e. "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector"We have 6 worker nodes.We changed the config to add a new mirror topic. i.e. append a new topic to the MirrorSourceConnector's "topics" config.The MM2 config topic reflects the change, as does viewing the config using Kowl UI.However, no tasks run to mirror the newly-added topic.We also do not see any updates on the MM2 status topic for the mirroring of that newly-added topic. We tried adding tasks to trigger a propagation of the task configs (increased from 36 to 40 tasks) however that did not unblock it.So triggering this code path did not seem to work: https://github.com/apache/kafka/blob/8cb0a5e9d3441962896b79163d141607e94d9b54/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1569-L1572 Only restarting the workers seemed to unblock the propagation of the new task config for the new mirrored topic. Hopefully this can help us narrow things down a bit... In the meantime we've since enabled the following DEBUG logging in production to try to get more hints the next time this happens: log4j.logger.org.apache.kafka.connect.storage.KafkaConfigBackingStore: DEBUG log4j.logger.org.apache.kafka.connect.runtime.distributed.DistributedHerder: DEBUG Perhaps that will show us if it's at all related MM2 config topic compaction and/or connectors in inconsistent state. Are there any other (not overly-verbose) classes you recommend we enable DEBUG logging on? Also, would making the inconsistent connectors (assuming they're being identified as such by Kafka Connect when this happens) through an API call also make sense so that this can be detected/monitored more easily? Thanks!
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
I don't think forcing the API users to introduce the nonce is desirable.For us, it would mean reaching out to the Strimzi project to try to get that implemented on their side, which I would imagine would be a proposal which would meet some resistance. Have you had any ideas on how this can be implemented within Kafka Connect itself so that it works as expected for all users? Thanks!
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Frank, The configs are being compared after ConfigProviders have been resolved. This is happening both as a Connector config (by ClusterConfigState::connectorConfig) and as task configs (by ClusterConfigState::taskConfig). This means that two configurations that have different direct contents (the path to a secret changed) can resolve to the same value if both paths produce the same value after resolving the config provider. This also means that if you change the secret on disk and re-submit the config, the new secret will be resolved in each of the ClusterConfigState calls, and also end up looking equivalent. > Would capturing a new generation value within the config itself on every submitted change be a possible fix/workaround? This is the workaround I proposed earlier in this conversation for external users to force updates, to add a nonce to their connector configuration. I don't think it's reasonable for the framework to do this unconditionally, so maybe we need to find an alternative if we want to fix this for everyone by default. Greg On Thu, Feb 9, 2023 at 8:26 AM Frank Grimes wrote: > I'm still having trouble understanding how the configs could match in the > code you highlighted when we change connector and/or task config values > when no keys are being pruned by the connector implementations in > question.Would capturing a new generation value within the config itself on > every submitted change be a possible fix/workaround?The possible slightly > negative consequence of that change would be that re-submitting the same > config which would effectively be a no-op in the current implementation > would now force task reconfigures/restarts? > On Wednesday, February 8, 2023, 12:47:19 PM EST, Greg Harris > wrote: > > This is the condition which is causing the issue: > https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931> > The DistributedHerder is comparing the generation 1 and generation 2 > configurations, and erroneously believes that they are equal, when in fact > the underlying secrets have changed.> This prevents the DistributedHerder > from writing generation 2 task configs to the KafkaConfigBackingStore > entirely, leaving it in state (A) without realizing it. > >
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
I'm still having trouble understanding how the configs could match in the code you highlighted when we change connector and/or task config values when no keys are being pruned by the connector implementations in question.Would capturing a new generation value within the config itself on every submitted change be a possible fix/workaround?The possible slightly negative consequence of that change would be that re-submitting the same config which would effectively be a no-op in the current implementation would now force task reconfigures/restarts? On Wednesday, February 8, 2023, 12:47:19 PM EST, Greg Harris wrote: > This is the condition which is causing the issue: > https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931> > The DistributedHerder is comparing the generation 1 and generation 2 > configurations, and erroneously believes that they are equal, when in fact > the underlying secrets have changed.> This prevents the DistributedHerder > from writing generation 2 task configs to the KafkaConfigBackingStore > entirely, leaving it in state (A) without realizing it.
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Frank, > I'm operating on the assumption that the connectors in question get stuck in an inconsistent state > Another thought... if an API exists to list all connectors in such a state, then at least some monitoring/alerting could be put in place, right? There is two different inconsistencies relevant to this discussion. Inconsistent state (A) is when a new connector config (generation 2) has been written to the config topic, but a full set of old task configs (generation 1) exist in the config topic. Inconsistent state (B) is when a new connector config (generation 2) has been written to the config topic, and there is an incomplete set of new task configs (generation 2) in the config topic. Since this is a reconfiguration, it could also be a mix of generation 1 and generation task 2 configs in the config topic. Inconsistent state (A) is a normal part of updating a connector config; the cluster writes the connector config to the config topic durably, and then begins to try to asynchronously regenerate task configs. Inconsistent state (B) is not normal, and happens when a worker is unable to atomically write a full set of task configurations to the config topic. The inconsistentConnectors method will report connectors in state (B), but will not report connectors in state (A). In state (A), you will see tasks running stale configurations. In state (B) you will see no tasks running, as the framework will prevent starting tasks which do not have consistent (B) task configs. As you're not seeing the no-tasks symptom, I would put state (B) out of mind and assume that the KafkaConfigBackingStore will give you atomic read and write semantics for a full set of task configs at once. > I see on KafkaConfigBackingStore.putTaskConfigs that if the JavaDoc is to be believed, a ConnectException is thrown "if the task configurations do not resolve inconsistencies found in the existing root and task configurations." It looks like ConnectExceptions are thrown after a failure to read or write to the config topic, which is a pretty broad class of errors. But if any such error occurs, the code cannot be guaranteed that a full set of task configs + task commit message was written, and the topic may be in state (A) or state (B). This method is called specifically to resolve state (A) or state (B), and the exception is just indicating that whatever inconsistency was present before the call may still be there. > Am I right that there is only one retry on that exception in DistributedHerder.reconfigureConnectorTasksWithRetry? No, the implementation of `reconfigureTasksWithRetry` calls itself in its error callback, and calls reconfigureConnector to retry the operation. This is a recursive loop, but isn't obvious because it's inside a callback. This is the condition which is causing the issue: https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931 The DistributedHerder is comparing the generation 1 and generation 2 configurations, and erroneously believes that they are equal, when in fact the underlying secrets have changed. This prevents the DistributedHerder from writing generation 2 task configs to the KafkaConfigBackingStore entirely, leaving it in state (A) without realizing it. Thanks for working on this issue, Greg On Wed, Feb 8, 2023 at 8:38 AM Frank Grimes wrote: > Another thought... if an API exists to list all connectors in such a > state, then at least some monitoring/alerting could be put in place, right?
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Another thought... if an API exists to list all connectors in such a state, then at least some monitoring/alerting could be put in place, right?
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
So I've been looking into the codebase to familiarize myself with it.I'm operating on the assumption that the connectors in question get stuck in an inconsistent state which causes them to prune the new task configs from those which are "broadcast" to the workers.I see on KafkaConfigBackingStore.putTaskConfigs that if the JavaDoc is to be believed, a ConnectException is thrown "if the task configurations do not resolve inconsistencies found in the existing root and task configurations.".Am I right that there is only one retry on that exception in DistributedHerder.reconfigureConnectorTasksWithRetry?Could more retry attempts help avoid the bug?Or once it's inconsistent in the config topic there's no chance it'll auto-resolve without a resizing? Thanks!
CVE-2023-25194: Apache Kafka: Possible RCE/Denial of service attack via SASL JAAS JndiLoginModule configuration using Kafka Connect
Severity: important Description: A possible security vulnerability has been identified in Apache Kafka Connect. This requires access to a Kafka Connect worker, and the ability to create/modify connectors on it with an arbitrary Kafka client SASL JAAS config and a SASL-based security protocol, which has been possible on Kafka Connect clusters since Apache Kafka 2.3.0. When configuring the connector via the Kafka Connect REST API, an authenticated operator can set the `sasl.jaas.config` property for any of the connector's Kafka clients to "com.sun.security.auth.module.JndiLoginModule", which can be done via the `producer.override.sasl.jaas.config`, `consumer.override.sasl.jaas.config`, or `admin.override.sasl.jaas.config` properties. This will allow the server to connect to the attacker's LDAP server and deserialize the LDAP response, which the attacker can use to execute java deserialization gadget chains on the Kafka connect server. Attackers can cause unrestricted deserialization of untrusted data (or) RCE vulnerability when there are gadgets in the classpath. Since Apache Kafka 3.0.0, users are allowed to specify these properties in connector configurations for Kafka Connect clusters running with out-of-the-box configurations. Before Apache Kafka 3.0.0, users may not specify these properties unless the Kafka Connect cluster has been reconfigured with a connector client override policy that permits them. Since Apache Kafka 3.4.0, we have added a system property ("-Dorg.apache.kafka.disallowed.login.modules") to disable the problematic login modules usage in SASL JAAS configuration. Also by default "com.sun.security.auth.module.JndiLoginModule" is disabled in Apache Kafka 3.4.0. We advise the Kafka Connect users to validate connector configurations and only allow trusted JNDI configurations. Also examine connector dependencies for vulnerable versions and either upgrade their connectors, upgrading that specific dependency, or removing the connectors as options for remediation. Finally, in addition to leveraging the "org.apache.kafka.disallowed.login.modules" system property, Kafka Connect users can also implement their own connector client config override policy, which can be used to control which Kafka client properties can be overridden directly in a connector config and which cannot. Credit: Apache Kafka would like to thank Jari Jääskelä (https://hackerone.com/reports/1529790) and 4ra1n and Y4tacker (they found vulnerabilities in other Apache projects. After discussion between PMC of the two projects, it was finally confirmed that it was the vulnerability of Kafka then they reported it to us) References: https://kafka.apache.org/cve-list https://kafka.apache.org/ https://www.cve.org/CVERecord?id=CVE-2023-25194
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Frank, I don't think that the fix needs to necessarily follow the #12450 PR, we can choose to start from scratch now that we know more about the issue. If that PR is useful as a starting point, we can also include it, that is up to you. Greg On Mon, Feb 6, 2023 at 10:21 AM Frank Grimes wrote: > > Hi Greg, > I actually just found the following comment on this PR for > https://issues.apache.org/jira/browse/KAFKA-13809: > https://github.com/apache/kafka/pull/12450 > > we get the same behavior (KAFKA-9228 notwithstanding) by passing the > original properties through to tasks transparently > It seems we're not the first to notice that the issue isn't limited to > connectors who selectively propagate properties to the task configs.FWIW, > the kafka-connect-s3 connector also does not seem to prune any configs from > the tasks: > https://github.com/confluentinc/kafka-connect-storage-cloud/blob/d21662e78286d79b938e7b9affa418b863a6299f/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnector.java#L57-L77 > Is the patch provided on https://github.com/apache/kafka/pull/7823 still > the right approach? > Thanks! > > > >
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Hi Greg, I actually just found the following comment on this PR for https://issues.apache.org/jira/browse/KAFKA-13809: https://github.com/apache/kafka/pull/12450 > we get the same behavior (KAFKA-9228 notwithstanding) by passing the original >properties through to tasks transparently It seems we're not the first to notice that the issue isn't limited to connectors who selectively propagate properties to the task configs.FWIW, the kafka-connect-s3 connector also does not seem to prune any configs from the tasks: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/d21662e78286d79b938e7b9affa418b863a6299f/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnector.java#L57-L77 Is the patch provided on https://github.com/apache/kafka/pull/7823 still the right approach? Thanks!
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Frank, I think you're right that the KAFKA-9228 ticket doesn't capture every possible reconfiguration that might result in a dropped restart. The ticket calls out the FileStream connectors, which generate their configurations by dropping unknown config values, which is relatively uncommon. This means that even changes to non-externalized configurations may not trigger a restart. We now know that dropped restarts can happen to non-FileStream connectors with externalized config values, but a fix for one should also fix the other. If you're interested in contributing a fix, we would welcome the contribution. Otherwise, I'll look into this and see what we can do about it. Please keep in mind the known workarounds for this bug that can improve the behavior before a fix lands. Thanks! Greg On Mon, Feb 6, 2023 at 8:50 AM Frank Grimes wrote: > Hi Greg, > The "long-term inconsistency" we have observed is not with no tasks at > all, but instead with all the previously running tasks remaining in a > running state but with a previous config. > If I'm understanding the original bug report correctly, the scope of the > problem was thought to only affect the following built-in connectors: > FileStreamSourceConnector and the FileStreamSinkConnector > see > https://issues.apache.org/jira/browse/KAFKA-9228?focusedCommentId=16993990=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16993990 > However, we are seeing this issue with a number of 3rd-party connectors > not provided as part of the Kafka project as well.e.g.- Confluent's > kafka-connect-s3 connector ( > https://github.com/confluentinc/kafka-connect-storage-cloud)- Aerospike's > connector: ( > https://docs.aerospike.com/connect/kafka/to-asdb/from-kafka-to-asdb-overview > ) > We're wondering if it would be possible to re-evaluate the impact of this > bug and look at addressing it either with the pre-existing PR ( > https://github.com/apache/kafka/pull/7823) or a new one. > Thanks!On Friday, February 3, 2023, 04:29:38 PM EST, Greg Harris > wrote: > > Frank, > > I realized I didn't respond to the title directly, sorry about that. > The reason that `ClusterConfigState::inconsistentConnectors` is not used, > is that the effect of an inconsistent connector is applied via > `ClusterConfigState::tasks`. > If a connector is inconsistent, then the tasks method will not return any > task configurations. > This will cause the outer logic to believe that there are 0 tasks defined, > and so any connector which does request a task reconfiguration will write > any task configs that are generated by the connector. > > And a task reconfiguration occurs on each connector start, and each time a > connector requests a reconfiguration. > If a reconfiguration failed (which is how the connector became > inconsistent) then it will be retried. > If the worker that had the reconfiguration fail then leaves the cluster, > then the rebalance algorithm will start the connector somewhere else, which > will trigger another task reconfiguration. > > Given the above, there does not appear to be any way to have long-term > inconsistent connectors without a reconfiguration consistently failing. > If you are seeing the symptoms of long-term inconsistency (no tasks at all > for a connector) then I'd be very interested in a reproduction case for > that. > > Thanks! > Greg Harris > > On Fri, Feb 3, 2023 at 1:05 PM Greg Harris wrote: > > > Frank, > > > > The inconsistentConnectors method is related to an extremely specific > > inconsistency that can happen when a worker writes some task > > configurations, and then disconnects without writing a following "commit > > tasks record" to the config topic. > > This is a hold-over from the early days of connect from before Kafka's > > transactions support, and is mostly an implementation detail. > > See the `KafkaConfigBackingStore::putTaskConfigs` and > > `KafkaConfigBackingStore::processTasksCommitRecord` for the relevant > code. > > It is not expected that this method is in regular use, and is primarily > > for diagnostic purposes. > > > > What the Strimzi issue seems to describe (and probably the issue you are > > facing) occurs at a higher level, when a worker is deciding whether to > > write new task configs at all. > > The relevant code is here: > > > https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931 > > In that snippet, new task configs generated by the connector are only > > written to the config topic if they differ from the current contents of > the > > config topic. And
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Hi Greg, The "long-term inconsistency" we have observed is not with no tasks at all, but instead with all the previously running tasks remaining in a running state but with a previous config. If I'm understanding the original bug report correctly, the scope of the problem was thought to only affect the following built-in connectors: FileStreamSourceConnector and the FileStreamSinkConnector see https://issues.apache.org/jira/browse/KAFKA-9228?focusedCommentId=16993990=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16993990 However, we are seeing this issue with a number of 3rd-party connectors not provided as part of the Kafka project as well.e.g.- Confluent's kafka-connect-s3 connector (https://github.com/confluentinc/kafka-connect-storage-cloud)- Aerospike's connector: (https://docs.aerospike.com/connect/kafka/to-asdb/from-kafka-to-asdb-overview) We're wondering if it would be possible to re-evaluate the impact of this bug and look at addressing it either with the pre-existing PR (https://github.com/apache/kafka/pull/7823) or a new one. Thanks!On Friday, February 3, 2023, 04:29:38 PM EST, Greg Harris wrote: Frank, I realized I didn't respond to the title directly, sorry about that. The reason that `ClusterConfigState::inconsistentConnectors` is not used, is that the effect of an inconsistent connector is applied via `ClusterConfigState::tasks`. If a connector is inconsistent, then the tasks method will not return any task configurations. This will cause the outer logic to believe that there are 0 tasks defined, and so any connector which does request a task reconfiguration will write any task configs that are generated by the connector. And a task reconfiguration occurs on each connector start, and each time a connector requests a reconfiguration. If a reconfiguration failed (which is how the connector became inconsistent) then it will be retried. If the worker that had the reconfiguration fail then leaves the cluster, then the rebalance algorithm will start the connector somewhere else, which will trigger another task reconfiguration. Given the above, there does not appear to be any way to have long-term inconsistent connectors without a reconfiguration consistently failing. If you are seeing the symptoms of long-term inconsistency (no tasks at all for a connector) then I'd be very interested in a reproduction case for that. Thanks! Greg Harris On Fri, Feb 3, 2023 at 1:05 PM Greg Harris wrote: > Frank, > > The inconsistentConnectors method is related to an extremely specific > inconsistency that can happen when a worker writes some task > configurations, and then disconnects without writing a following "commit > tasks record" to the config topic. > This is a hold-over from the early days of connect from before Kafka's > transactions support, and is mostly an implementation detail. > See the `KafkaConfigBackingStore::putTaskConfigs` and > `KafkaConfigBackingStore::processTasksCommitRecord` for the relevant code. > It is not expected that this method is in regular use, and is primarily > for diagnostic purposes. > > What the Strimzi issue seems to describe (and probably the issue you are > facing) occurs at a higher level, when a worker is deciding whether to > write new task configs at all. > The relevant code is here: > https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931 > In that snippet, new task configs generated by the connector are only > written to the config topic if they differ from the current contents of the > config topic. And this comparison is done on the post-transformation > configurations, after ConfigProviders have been resolved. > And critical for this bug, that resolution is done twice in quick > succession, when the old and new configuration could evaluate to the same > final result. > The code snippet also shows why your workaround works: the other condition > for writing all of the task configs to the config topic is that the number > of configurations has changed. > > I believe this bug is captured in > https://issues.apache.org/jira/browse/KAFKA-9228 but it has not > progressed in some time. > There is a potentially lower-impact workaround that involves adding a > nonce to your connector configuration that changes each time you apply a > new configuration to the connector, which most connectors will pass > directly to their tasks. > But this unfortunately does not work in general, as connectors could > exclude the nonce when generating task configurations. > > I hope this gives some more insight to the behavior you're seeing. > > Thanks, > Greg Harris > > On Fri, Feb 3, 2023 at 7:36 AM Frank Grimes > wrote: > >> Hi, we're
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Frank, I realized I didn't respond to the title directly, sorry about that. The reason that `ClusterConfigState::inconsistentConnectors` is not used, is that the effect of an inconsistent connector is applied via `ClusterConfigState::tasks`. If a connector is inconsistent, then the tasks method will not return any task configurations. This will cause the outer logic to believe that there are 0 tasks defined, and so any connector which does request a task reconfiguration will write any task configs that are generated by the connector. And a task reconfiguration occurs on each connector start, and each time a connector requests a reconfiguration. If a reconfiguration failed (which is how the connector became inconsistent) then it will be retried. If the worker that had the reconfiguration fail then leaves the cluster, then the rebalance algorithm will start the connector somewhere else, which will trigger another task reconfiguration. Given the above, there does not appear to be any way to have long-term inconsistent connectors without a reconfiguration consistently failing. If you are seeing the symptoms of long-term inconsistency (no tasks at all for a connector) then I'd be very interested in a reproduction case for that. Thanks! Greg Harris On Fri, Feb 3, 2023 at 1:05 PM Greg Harris wrote: > Frank, > > The inconsistentConnectors method is related to an extremely specific > inconsistency that can happen when a worker writes some task > configurations, and then disconnects without writing a following "commit > tasks record" to the config topic. > This is a hold-over from the early days of connect from before Kafka's > transactions support, and is mostly an implementation detail. > See the `KafkaConfigBackingStore::putTaskConfigs` and > `KafkaConfigBackingStore::processTasksCommitRecord` for the relevant code. > It is not expected that this method is in regular use, and is primarily > for diagnostic purposes. > > What the Strimzi issue seems to describe (and probably the issue you are > facing) occurs at a higher level, when a worker is deciding whether to > write new task configs at all. > The relevant code is here: > https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931 > In that snippet, new task configs generated by the connector are only > written to the config topic if they differ from the current contents of the > config topic. And this comparison is done on the post-transformation > configurations, after ConfigProviders have been resolved. > And critical for this bug, that resolution is done twice in quick > succession, when the old and new configuration could evaluate to the same > final result. > The code snippet also shows why your workaround works: the other condition > for writing all of the task configs to the config topic is that the number > of configurations has changed. > > I believe this bug is captured in > https://issues.apache.org/jira/browse/KAFKA-9228 but it has not > progressed in some time. > There is a potentially lower-impact workaround that involves adding a > nonce to your connector configuration that changes each time you apply a > new configuration to the connector, which most connectors will pass > directly to their tasks. > But this unfortunately does not work in general, as connectors could > exclude the nonce when generating task configurations. > > I hope this gives some more insight to the behavior you're seeing. > > Thanks, > Greg Harris > > On Fri, Feb 3, 2023 at 7:36 AM Frank Grimes > wrote: > >> Hi, we're investigating an issue where occasionally config changes don't >> propagate to connectors/tasks. >> >> When this occurs, the only way to ensure that the configuration takes >> effect is to resize the number of tasks back down to 1 and then resize back >> up to the original number of tasks. >> In searching for others who have been bitten by this scenario we found >> the following thread on the Strimzi discussions pages: >> https://github.com/strimzi/strimzi-kafka-operator/discussions/7738 >> Both the symptoms and workaround described there match what we've >> seen.We've been doing some digging into the Kafka Connect codebase to >> better understand how config.storage.topic is consumed. >> In the interest of brevity I won't repeat that entire thread of >> discussion here. >> However, I was wondering if anyone knows whether the JavaDoc suggestion >> on ClusterConfigState.inconsistentConnectors() is actually implemented in >> the clustered Worker code.i.e. "When a worker detects a connector in this >> state, it should request that the connector regenerate its task >> configurations." >> The reason I ask is because I couldn't find any references to that API >> call anywhere but in the KafkaConfigBackingStoreTest unit test cases. >> Thanks! >> >
Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Frank, The inconsistentConnectors method is related to an extremely specific inconsistency that can happen when a worker writes some task configurations, and then disconnects without writing a following "commit tasks record" to the config topic. This is a hold-over from the early days of connect from before Kafka's transactions support, and is mostly an implementation detail. See the `KafkaConfigBackingStore::putTaskConfigs` and `KafkaConfigBackingStore::processTasksCommitRecord` for the relevant code. It is not expected that this method is in regular use, and is primarily for diagnostic purposes. What the Strimzi issue seems to describe (and probably the issue you are facing) occurs at a higher level, when a worker is deciding whether to write new task configs at all. The relevant code is here: https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931 In that snippet, new task configs generated by the connector are only written to the config topic if they differ from the current contents of the config topic. And this comparison is done on the post-transformation configurations, after ConfigProviders have been resolved. And critical for this bug, that resolution is done twice in quick succession, when the old and new configuration could evaluate to the same final result. The code snippet also shows why your workaround works: the other condition for writing all of the task configs to the config topic is that the number of configurations has changed. I believe this bug is captured in https://issues.apache.org/jira/browse/KAFKA-9228 but it has not progressed in some time. There is a potentially lower-impact workaround that involves adding a nonce to your connector configuration that changes each time you apply a new configuration to the connector, which most connectors will pass directly to their tasks. But this unfortunately does not work in general, as connectors could exclude the nonce when generating task configurations. I hope this gives some more insight to the behavior you're seeing. Thanks, Greg Harris On Fri, Feb 3, 2023 at 7:36 AM Frank Grimes wrote: > Hi, we're investigating an issue where occasionally config changes don't > propagate to connectors/tasks. > > When this occurs, the only way to ensure that the configuration takes > effect is to resize the number of tasks back down to 1 and then resize back > up to the original number of tasks. > In searching for others who have been bitten by this scenario we found the > following thread on the Strimzi discussions pages: > https://github.com/strimzi/strimzi-kafka-operator/discussions/7738 > Both the symptoms and workaround described there match what we've > seen.We've been doing some digging into the Kafka Connect codebase to > better understand how config.storage.topic is consumed. > In the interest of brevity I won't repeat that entire thread of discussion > here. > However, I was wondering if anyone knows whether the JavaDoc suggestion on > ClusterConfigState.inconsistentConnectors() is actually implemented in the > clustered Worker code.i.e. "When a worker detects a connector in this > state, it should request that the connector regenerate its task > configurations." > The reason I ask is because I couldn't find any references to that API > call anywhere but in the KafkaConfigBackingStoreTest unit test cases. > Thanks! >
Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?
Hi, we're investigating an issue where occasionally config changes don't propagate to connectors/tasks. When this occurs, the only way to ensure that the configuration takes effect is to resize the number of tasks back down to 1 and then resize back up to the original number of tasks. In searching for others who have been bitten by this scenario we found the following thread on the Strimzi discussions pages: https://github.com/strimzi/strimzi-kafka-operator/discussions/7738 Both the symptoms and workaround described there match what we've seen.We've been doing some digging into the Kafka Connect codebase to better understand how config.storage.topic is consumed. In the interest of brevity I won't repeat that entire thread of discussion here. However, I was wondering if anyone knows whether the JavaDoc suggestion on ClusterConfigState.inconsistentConnectors() is actually implemented in the clustered Worker code.i.e. "When a worker detects a connector in this state, it should request that the connector regenerate its task configurations." The reason I ask is because I couldn't find any references to that API call anywhere but in the KafkaConfigBackingStoreTest unit test cases. Thanks!
Kafka connect SMT does not work for primary key columns
tions] 2022-11-25 06:57:01,510 WARN || Write of 2 records failed, remainingRetries=0 [io.confluent.connect.jdbc.sink.JdbcSinkTask] java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT ("created_at") DO NOTHING was aborted: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 Call getNextException to see other errors in the batch. at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638) at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.postgresql.util.PSQLException: ERROR: column "created_at" is of type timestamp without time zone but expression is of type bigint Hint: You will need to rewrite or cast the expression. Position: 52 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868) ... 17 more The error seems like the sink connector was still trying to insert created_at with a numeric 1669359291990398. but I verified that the messages in the kafka topic have been transformed into strings. It worked if created_at is not a primary key. I just don't know why SMT does not work for primary key columns. How can I fix it? Could someone help? much appreciated. my SMT: https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java my sink configuration: { "name": "test-sinker", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics.regex": "test.public.pk_created_at", "table.name.format": "${topic}", "connection.url": "jdbc:postgresql://target:5432/test?stringtype=unspecified=postgres=postgres", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enabled": true, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enabled": true, "transforms": "dropPrefix", "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.dropPrefix.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.dropPrefix.replacement": "$3", "auto.create": "false", "insert.mode": "upsert", "pk.mode": "record_key", "delete.enabled": true } }
Active <> Active MirrorMaker2 setup via dedicated Kafka Connect cluster
Hi, I am trying to set up active <> active mm2 via Kafka connect distributed cluster. It seems not possible because of the limitations like *bootstrap.servers *property. And also as per this KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-RunningMirrorMakerinaConnectcluster, it is not possible to have sink connector. It means only one way is possible. I have already tried other ways. Wanted to try this out using Kafka connect distributed cluster setup. Please kindly help me if I am doing anything wrong here and also throw some light on how to set up active <> active mm2 via Kafka connect cluster if it is possible. I really appreciate any help you can provide. -- *Sriram G* *Tech*
Re: Entire Kafka Connect cluster stuck because of a stuck sink connector
Hi, What version of Kafka Connect are you running? This sounds like a bug that was fixed a few releases ago. Cheers, Chris On Wed, Oct 12, 2022, 21:27 Hemanth Savasere wrote: > We have stumbled upon an issue on a running cluster with multiple > source/sink connectors: > >1. One of our connectors was a JDBC sink connector connected to an SQL >Server database (using the oracle JDBC driver). >2. It turns out that the DB instance had a problem causing all queries >to be stuck forever, which in turn made the start method of the > connector >hang forever. >3. After some time, the entire Kafka Connect cluster was unavailable and >the REST API was not responding giving > {"error_code":500,"message":"Request >timed out"} for most requests. >4. Pausing (just before the deletion of the consumer group) or deleting >the problematic connector allowed the cluster to run normally again. > > We could reproduce the same issue by adding Thread.sleep(30) in the > start method or in the put method of the ConnectorTask. > > Wanted to know if there's any wiki/documentation provided that mentions how > to handle this issue. My approach would be to throw a timeout after waiting > for a particular time period and make the connector fail fast. > > -- > Thanks & Regards, > Hemanth >