Kafka Connect Worker Provisioning/Optimization Considerations

2024-07-04 Thread Burton Williams
I have 100+ sink connectors running with 100+ topics each with roughly 3
partitions per topic. How would you configure resources (mem and cpu) to
optimally handle this level of load. What would be your considerations?
Also, When considering this load, should i be thinking about it as an
aggregated value (1 node at 1cpu and 4g = 10cpu and 40g if i have 10
workers).

Thank you kindly,

-BW


Re: Kafka Connect Limits

2024-07-04 Thread Burton Williams
Thank you for having a look at this. I agree that the only way to really
gauge load is to look at lag. But the connector tasks should not crash and
die because of load. I will raise this with SF.

On Wed, Jul 3, 2024 at 7:14 PM Greg Harris 
wrote:

> Hey Burton,
>
> Thanks for your question and bug report.
>
> The exception you included does not indicate that your connectors are
> overloaded. The primary way of diagnosing an overloaded connector is the
> consumer lag metric, and if you're seeing acceptable lag, that should
> indicate that your connectors are capable of handling the load.
> I would say that your workload is _unhealthy_ though, because it should be
> able to operate without throwing this particular exception. I found one
> previous report of this exception [1] but no action was taken. Instead, the
> recommendation was to change the task implementation to perform offset
> loading only during open().
>
> It looks like this depends on the task implementation: If the consumer
> rebalances and the task loses its assignment, and the task later calls
> SinkTaskContext#offset() with the now-revoked partition, it would cause
> this exception.
> I'm not familiar with the Snowflake task, but upon a cursory inspection, it
> looks like it buffers records [2] across poll() calls, and may call
> SinkTaskContext#offset() in a later poll [3]. They appear to have a close()
> method that could be used to prevent SinkTaskContext#offset() from being
> called, but I'm not sure why it isn't effective.
> You should contact the Snowflake Connector maintainers and know that they
> are exposed to this exception.
>
> I'll re-open this issue on the framework side to see if we can find a
> solution to fix this for other connectors.
>
> Thanks,
> Greg
>
> [1] https://issues.apache.org/jira/browse/KAFKA-10370
> [2]
>
> https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L380-L383
> [3]
>
> https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L908
> [4]
>
> https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java#L442
>
> On Wed, Jul 3, 2024 at 12:25 PM Burton Williams <
> burton.b.willi...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I have 100+ sink connectors running with 100+ topics each with roughly 3
> > partitions per topic. There are running on K8s on 10 pods with 6 cpus and
> > 32Gig mem. The connector in question is Snowflake's sink connector
> v2.2.0.
> > This worked in the mini batch mode SNOWPIPE, but once i switched over to
> > SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the
> > exception:
> >
> > State:  FAILED
> >
> > Worker ID:  10.136.83.73:8080
> >
> > Trace:  java.lang.IllegalStateException: No current assignment for
> > partition bigpicture.bulk_change-0
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
> >
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386)
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637)
> >
> > at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642)
> >
> > at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327)
> >
> > at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)
> >
> > at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)
> >
> > at
> > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
> >
> > at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
> >
> > at
> >
> >
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
> >
> > at
> >
> >
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> >
> > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> >
> > at
> >
> >
> java.ba

Re: Kafka Connect Limits

2024-07-03 Thread Greg Harris
Hey Burton,

Thanks for your question and bug report.

The exception you included does not indicate that your connectors are
overloaded. The primary way of diagnosing an overloaded connector is the
consumer lag metric, and if you're seeing acceptable lag, that should
indicate that your connectors are capable of handling the load.
I would say that your workload is _unhealthy_ though, because it should be
able to operate without throwing this particular exception. I found one
previous report of this exception [1] but no action was taken. Instead, the
recommendation was to change the task implementation to perform offset
loading only during open().

It looks like this depends on the task implementation: If the consumer
rebalances and the task loses its assignment, and the task later calls
SinkTaskContext#offset() with the now-revoked partition, it would cause
this exception.
I'm not familiar with the Snowflake task, but upon a cursory inspection, it
looks like it buffers records [2] across poll() calls, and may call
SinkTaskContext#offset() in a later poll [3]. They appear to have a close()
method that could be used to prevent SinkTaskContext#offset() from being
called, but I'm not sure why it isn't effective.
You should contact the Snowflake Connector maintainers and know that they
are exposed to this exception.

I'll re-open this issue on the framework side to see if we can find a
solution to fix this for other connectors.

Thanks,
Greg

[1] https://issues.apache.org/jira/browse/KAFKA-10370
[2]
https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L380-L383
[3]
https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L908
[4]
https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java#L442

On Wed, Jul 3, 2024 at 12:25 PM Burton Williams 
wrote:

> Hi,
>
> I have 100+ sink connectors running with 100+ topics each with roughly 3
> partitions per topic. There are running on K8s on 10 pods with 6 cpus and
> 32Gig mem. The connector in question is Snowflake's sink connector v2.2.0.
> This worked in the mini batch mode SNOWPIPE, but once i switched over to
> SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the
> exception:
>
> State:  FAILED
>
> Worker ID:  10.136.83.73:8080
>
> Trace:  java.lang.IllegalStateException: No current assignment for
> partition bigpicture.bulk_change-0
> at
>
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
>
> at
>
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637)
>
> at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642)
>
> at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327)
>
> at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)
>
> at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)
>
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
>
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
>
> at
>
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
>
> at
>
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
> at
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> at
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>
> at java.base/java.lang.Thread.run(Thread.java:829)
>
>
>
> My questions are:
>
>1. Are these connectors are overloaded? Can kafka connect handle this
>level of load?
>2. say it can, which i've seen it do, could it be that this is caused by
>underlying rebalancing? If so what would you recommend I do to mitigate?
>
>
> Thanks
>
> -BW
>


Kafka Connect Limits

2024-07-03 Thread Burton Williams
Hi,

I have 100+ sink connectors running with 100+ topics each with roughly 3
partitions per topic. There are running on K8s on 10 pods with 6 cpus and
32Gig mem. The connector in question is Snowflake's sink connector v2.2.0.
This worked in the mini batch mode SNOWPIPE, but once i switched over to
SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the
exception:

State:  FAILED

Worker ID:  10.136.83.73:8080

Trace:  java.lang.IllegalStateException: No current assignment for
partition bigpicture.bulk_change-0
at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)

at
org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386)
at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637)

at
org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642)

at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327)

at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)

at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)

at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)

at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)

at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)



My questions are:

   1. Are these connectors are overloaded? Can kafka connect handle this
   level of load?
   2. say it can, which i've seen it do, could it be that this is caused by
   underlying rebalancing? If so what would you recommend I do to mitigate?


Thanks

-BW


Re: Regarding Kafka connect task to partition relationship for both source and sink connectors

2024-05-30 Thread Alex Craig
For sink connectors, I believe you can scale up the tasks to match the
partitions on the topic.  But I don't believe this is the case for source
connectors; the number of partitions on the topic you're producing to has
nothing to do with the number of connector tasks.  It really depends on the
individual source connector and if the source data-type could benefit from
multiple tasks.  For example, the JDBC source connector (a very popular
connector) only supports 1 task - even if you're querying multiple tables.

Bottom line: you'll need to check the documentation for the connector in
question to see if it supports multiple tasks.

Alex

On Thu, May 30, 2024 at 7:51 AM Sébastien Rebecchi 
wrote:

> Hello
>
> Confirmed. Partition is the minimal granularity level, so having more
> consumers than the number of partitions of a topic for a same consumer
> group is useless, having P partitions means maximum parallelism is reached
> using P consumers.
>
> Regards,
>
> Sébastien.
>
> Le jeu. 30 mai 2024 à 14:43, Yeikel Santana  a écrit :
>
> > Hi everyone,
> >
> >
> > From my understanding, if a topic has  n partitions, we can create up to
> n
> > tasks for both the source and sink connectors to achieve the maximum
> > parallelism. Adding more tasks would not be beneficial, as they would
> > remain idle and be limited to the number of partitions of the topic
> >
> >
> > Could you please confirm if this understanding is correct?
> >
> >
> > If this understanding is incorrect could you please explain the
> > relationship if any?
> >
> >
> > Thank you!
> >
> >
> >
>


Re: Regarding Kafka connect task to partition relationship for both source and sink connectors

2024-05-30 Thread Sébastien Rebecchi
Hello

Confirmed. Partition is the minimal granularity level, so having more
consumers than the number of partitions of a topic for a same consumer
group is useless, having P partitions means maximum parallelism is reached
using P consumers.

Regards,

Sébastien.

Le jeu. 30 mai 2024 à 14:43, Yeikel Santana  a écrit :

> Hi everyone,
>
>
> From my understanding, if a topic has  n partitions, we can create up to n
> tasks for both the source and sink connectors to achieve the maximum
> parallelism. Adding more tasks would not be beneficial, as they would
> remain idle and be limited to the number of partitions of the topic
>
>
> Could you please confirm if this understanding is correct?
>
>
> If this understanding is incorrect could you please explain the
> relationship if any?
>
>
> Thank you!
>
>
>


Re: [EXTERNAL] Regarding Kafka connect task to partition relationship for both source and sink connectors

2024-05-30 Thread Tauzell, Dave
The docs say:  “Each task is assigned to a thread. Each task is capable of 
handling multiple Kafka partitions, but a single partition must be handled by 
only one task.”From what I understand additional tasks would sit idle.



From: Yeikel Santana 
Date: Thursday, May 30, 2024 at 7:43 AM
To: users@kafka.apache.org 
Subject: [EXTERNAL] Regarding Kafka connect task to partition relationship for 
both source and sink connectors
Hi everyone,


From my understanding, if a topic has  n partitions, we can create up to n 
tasks for both the source and sink connectors to achieve the maximum 
parallelism. Adding more tasks would not be beneficial, as they would remain 
idle and be limited to the number of partitions of the topic


Could you please confirm if this understanding is correct?


If this understanding is incorrect could you please explain the relationship if 
any?


Thank you!

This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Regarding Kafka connect task to partition relationship for both source and sink connectors

2024-05-30 Thread Yeikel Santana
Hi everyone,


>From my understanding, if a topic has  n partitions, we can create up to n 
>tasks for both the source and sink connectors to achieve the maximum 
>parallelism. Adding more tasks would not be beneficial, as they would remain 
>idle and be limited to the number of partitions of the topic


Could you please confirm if this understanding is correct?


If this understanding is incorrect could you please explain the relationship if 
any?


Thank you!




kafka connect support time percision

2024-04-02 Thread Liron Shalom
Hi,

I'm using kafka connect, passing data with avro schema.
By default I get a schema of mili-seconds time precision for datetime2
columns.

Do you support time precision of micro seconds as well?

Thanks


Re: [Kafka Connect] Dead letter queues for source connectors?

2024-03-07 Thread Chris Egerton
Hey Greg,

Thinking more, I do like the idea of a source-side equivalent of the
ErrantRecordReporter interface!

However, I also suspect we may have to reason more carefully about what
users could do with this kind of information in a DLQ topic. Yes, it's an
option to reset the connector (or a copy of it) to the earliest unprocessed
partition/offset in the DLQ topic and start processing data anew from
there, but this might lead to a flood of duplicates. IIRC it also wouldn't
even be this simple if we were to tolerate duplicates--users would have to
reset the connector to the offset just before the one present in the DLQ,
since resetting to the actual offset would make it appear to the connector
as if the record with that offset were produced and committed successfully.
Maybe we could include both the offset for the failed record, and the
offset for the last record before that one with the same source
partition--basically saying to the user "This is the one that failed, and
this is the one that needs to be reset to if you want to try again".

I'm starting to wonder if, for now, we could try to design something that's
useful for metadata and for manual intervention, but perhaps not for direct
usage with a connector (e.g., we wouldn't expect people to take the
contents of the DLQ topic and use it to start up a second connector for the
sole purpose of slurping up previously-dropped records). Thoughts?

Cheers,

Chris

On Tue, Mar 5, 2024 at 6:19 PM Greg Harris 
wrote:

> Hey Chris,
>
> That's a cool idea! That can certainly be applied for failures other
> than poll(), and could be useful when combined with the Offsets
> modification API.
>
> Perhaps failures inside of poll() can be handled by an extra
> mechanism, similar to the ErrantRecordReporter, which allows reporting
> affected source partition/source offsets when a meaningful key or
> value cannot be read.
>
> Thanks,
> Greg
>
> On Tue, Mar 5, 2024 at 3:03 PM Chris Egerton 
> wrote:
> >
> > Hi Greg,
> >
> > This was my understanding as well--if we can't turn a record into a byte
> > array on the source side, it's difficult to know exactly what to write
> to a
> > DLQ topic.
> >
> > One idea I've toyed with recently is that we could write the source
> > partition and offset for the failed record (assuming, hopefully safely,
> > that these can at least be serialized). This may not cover all bases, is
> > highly dependent on how user-friendly the offsets published by the
> > connector are, and does come with the risk of data loss (if the upstream
> > system is wiped before skipped records can be recovered), but could be
> > useful in some scenarios.
> >
> > Thoughts?
> >
> > Chris
> >
> > On Tue, Mar 5, 2024 at 5:49 PM Greg Harris  >
> > wrote:
> >
> > > Hi Yeikel,
> > >
> > > Thanks for your question. It certainly isn't clear from the original
> > > KIP-298, the attached discussion, or the follow-up KIP-610 as to why
> > > the situation is asymmetric.
> > >
> > > The reason as I understand it is: Source connectors are responsible
> > > for importing data to Kafka. If an error occurs during this process,
> > > then writing useful information to a dead letter queue about the
> > > failure is at least as difficult as importing the record correctly.
> > >
> > > For some examples:
> > > * If an error occurs during poll(), the external data has not yet been
> > > transformed into a SourceRecord that the framework can transform or
> > > serialize.
> > > * If an error occurs during conversion/serialization, the external
> > > data cannot be reasonably serialized to be forwarded to the DLQ.
> > > * If a record cannot be written to Kafka, such as due to being too
> > > large, the same failure is likely to happen with writing to the DLQ as
> > > well.
> > >
> > > For the Sink side, we already know that the data was properly
> > > serializable and appeared as a ConsumerRecord. That can
> > > be forwarded to the DLQ as-is with a reasonable expectation for
> > > success, with the same data formatting as the source topic.
> > >
> > > If you have a vision for how this can be improved and are interested,
> > > please consider opening a KIP! The situation can certainly be made
> > > better than it is today.
> > >
> > > Thanks!
> > > Greg
> > >
> > > On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana 
> wrote:
> > > >
> > > > Hi all,
> > > >
> > > > Sink connectors support Dear Letter Queues[1], but Source connectors
> > > don't seem to
> > > >
> > > > What is the reason that we decided to do that?
> > > >
> > > > In my data pipeline, I'd like to apply some transformations to the
> > > messages before they are sink, but that leaves me vulnerable to
> failures as
> > > I need to either fail the connector or employ logging to track source
> > > failures
> > > >
> > > > It seems that for now, I'll need to apply the transformations as a
> sink
> > > and possibly reinsert them back to Kafka for downstream consumption,
> but
> > > that sounds unnecessary
> > > >
> > > >
> > > > [1]
> 

Re: [Kafka Connect] Dead letter queues for source connectors?

2024-03-05 Thread Greg Harris
Hey Chris,

That's a cool idea! That can certainly be applied for failures other
than poll(), and could be useful when combined with the Offsets
modification API.

Perhaps failures inside of poll() can be handled by an extra
mechanism, similar to the ErrantRecordReporter, which allows reporting
affected source partition/source offsets when a meaningful key or
value cannot be read.

Thanks,
Greg

On Tue, Mar 5, 2024 at 3:03 PM Chris Egerton  wrote:
>
> Hi Greg,
>
> This was my understanding as well--if we can't turn a record into a byte
> array on the source side, it's difficult to know exactly what to write to a
> DLQ topic.
>
> One idea I've toyed with recently is that we could write the source
> partition and offset for the failed record (assuming, hopefully safely,
> that these can at least be serialized). This may not cover all bases, is
> highly dependent on how user-friendly the offsets published by the
> connector are, and does come with the risk of data loss (if the upstream
> system is wiped before skipped records can be recovered), but could be
> useful in some scenarios.
>
> Thoughts?
>
> Chris
>
> On Tue, Mar 5, 2024 at 5:49 PM Greg Harris 
> wrote:
>
> > Hi Yeikel,
> >
> > Thanks for your question. It certainly isn't clear from the original
> > KIP-298, the attached discussion, or the follow-up KIP-610 as to why
> > the situation is asymmetric.
> >
> > The reason as I understand it is: Source connectors are responsible
> > for importing data to Kafka. If an error occurs during this process,
> > then writing useful information to a dead letter queue about the
> > failure is at least as difficult as importing the record correctly.
> >
> > For some examples:
> > * If an error occurs during poll(), the external data has not yet been
> > transformed into a SourceRecord that the framework can transform or
> > serialize.
> > * If an error occurs during conversion/serialization, the external
> > data cannot be reasonably serialized to be forwarded to the DLQ.
> > * If a record cannot be written to Kafka, such as due to being too
> > large, the same failure is likely to happen with writing to the DLQ as
> > well.
> >
> > For the Sink side, we already know that the data was properly
> > serializable and appeared as a ConsumerRecord. That can
> > be forwarded to the DLQ as-is with a reasonable expectation for
> > success, with the same data formatting as the source topic.
> >
> > If you have a vision for how this can be improved and are interested,
> > please consider opening a KIP! The situation can certainly be made
> > better than it is today.
> >
> > Thanks!
> > Greg
> >
> > On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana  wrote:
> > >
> > > Hi all,
> > >
> > > Sink connectors support Dear Letter Queues[1], but Source connectors
> > don't seem to
> > >
> > > What is the reason that we decided to do that?
> > >
> > > In my data pipeline, I'd like to apply some transformations to the
> > messages before they are sink, but that leaves me vulnerable to failures as
> > I need to either fail the connector or employ logging to track source
> > failures
> > >
> > > It seems that for now, I'll need to apply the transformations as a sink
> > and possibly reinsert them back to Kafka for downstream consumption, but
> > that sounds unnecessary
> > >
> > >
> > > [1]
> > https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065
> >


Re: [Kafka Connect] Dead letter queues for source connectors?

2024-03-05 Thread Chris Egerton
Hi Greg,

This was my understanding as well--if we can't turn a record into a byte
array on the source side, it's difficult to know exactly what to write to a
DLQ topic.

One idea I've toyed with recently is that we could write the source
partition and offset for the failed record (assuming, hopefully safely,
that these can at least be serialized). This may not cover all bases, is
highly dependent on how user-friendly the offsets published by the
connector are, and does come with the risk of data loss (if the upstream
system is wiped before skipped records can be recovered), but could be
useful in some scenarios.

Thoughts?

Chris

On Tue, Mar 5, 2024 at 5:49 PM Greg Harris 
wrote:

> Hi Yeikel,
>
> Thanks for your question. It certainly isn't clear from the original
> KIP-298, the attached discussion, or the follow-up KIP-610 as to why
> the situation is asymmetric.
>
> The reason as I understand it is: Source connectors are responsible
> for importing data to Kafka. If an error occurs during this process,
> then writing useful information to a dead letter queue about the
> failure is at least as difficult as importing the record correctly.
>
> For some examples:
> * If an error occurs during poll(), the external data has not yet been
> transformed into a SourceRecord that the framework can transform or
> serialize.
> * If an error occurs during conversion/serialization, the external
> data cannot be reasonably serialized to be forwarded to the DLQ.
> * If a record cannot be written to Kafka, such as due to being too
> large, the same failure is likely to happen with writing to the DLQ as
> well.
>
> For the Sink side, we already know that the data was properly
> serializable and appeared as a ConsumerRecord. That can
> be forwarded to the DLQ as-is with a reasonable expectation for
> success, with the same data formatting as the source topic.
>
> If you have a vision for how this can be improved and are interested,
> please consider opening a KIP! The situation can certainly be made
> better than it is today.
>
> Thanks!
> Greg
>
> On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana  wrote:
> >
> > Hi all,
> >
> > Sink connectors support Dear Letter Queues[1], but Source connectors
> don't seem to
> >
> > What is the reason that we decided to do that?
> >
> > In my data pipeline, I'd like to apply some transformations to the
> messages before they are sink, but that leaves me vulnerable to failures as
> I need to either fail the connector or employ logging to track source
> failures
> >
> > It seems that for now, I'll need to apply the transformations as a sink
> and possibly reinsert them back to Kafka for downstream consumption, but
> that sounds unnecessary
> >
> >
> > [1]
> https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065
>


Re: [Kafka Connect] Dead letter queues for source connectors?

2024-03-05 Thread Greg Harris
Hi Yeikel,

Thanks for your question. It certainly isn't clear from the original
KIP-298, the attached discussion, or the follow-up KIP-610 as to why
the situation is asymmetric.

The reason as I understand it is: Source connectors are responsible
for importing data to Kafka. If an error occurs during this process,
then writing useful information to a dead letter queue about the
failure is at least as difficult as importing the record correctly.

For some examples:
* If an error occurs during poll(), the external data has not yet been
transformed into a SourceRecord that the framework can transform or
serialize.
* If an error occurs during conversion/serialization, the external
data cannot be reasonably serialized to be forwarded to the DLQ.
* If a record cannot be written to Kafka, such as due to being too
large, the same failure is likely to happen with writing to the DLQ as
well.

For the Sink side, we already know that the data was properly
serializable and appeared as a ConsumerRecord. That can
be forwarded to the DLQ as-is with a reasonable expectation for
success, with the same data formatting as the source topic.

If you have a vision for how this can be improved and are interested,
please consider opening a KIP! The situation can certainly be made
better than it is today.

Thanks!
Greg

On Tue, Mar 5, 2024 at 5:35 AM Yeikel Santana  wrote:
>
> Hi all,
>
> Sink connectors support Dear Letter Queues[1], but Source connectors don't 
> seem to
>
> What is the reason that we decided to do that?
>
> In my data pipeline, I'd like to apply some transformations to the messages 
> before they are sink, but that leaves me vulnerable to failures as I need to 
> either fail the connector or employ logging to track source failures
>
> It seems that for now, I'll need to apply the transformations as a sink and 
> possibly reinsert them back to Kafka for downstream consumption, but that 
> sounds unnecessary
>
>
> [1]https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065


[Kafka Connect] Dead letter queues for source connectors?

2024-03-05 Thread Yeikel Santana
Hi all,

Sink connectors support Dear Letter Queues[1], but Source connectors don't seem 
to

What is the reason that we decided to do that?

In my data pipeline, I'd like to apply some transformations to the messages 
before they are sink, but that leaves me vulnerable to failures as I need to 
either fail the connector or employ logging to track source failures

It seems that for now, I'll need to apply the transformations as a sink and 
possibly reinsert them back to Kafka for downstream consumption, but that 
sounds unnecessary 


[1]https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=80453065#content/view/80453065

[DISCUSS] Kafka Connect source task interruption semantics

2023-12-12 Thread Chris Egerton
Hi all,

I'd like to solicit input from users and maintainers on a problem we've
been dealing with for source task cleanup logic.

If you'd like to pore over some Jira history, here's the primary link:
https://issues.apache.org/jira/browse/KAFKA-15090

To summarize, we accidentally introduced a breaking change for Kafka
Connect in https://github.com/apache/kafka/pull/9669. Before that change,
the SourceTask::stop method [1] would be invoked on a separate thread from
the one that did the actual data processing for the task (polling the task
for records, transforming and converting those records, then sending them
to Kafka). After that change, we began invoking SourceTask::stop on the
same thread that handled data processing for the task. This had the effect
that tasks which blocked indefinitely in the SourceTask::poll method [2]
with the expectation that they could stop blocking when SourceTask::stop
was invoked would no longer be capable of graceful shutdown, and may even
hang forever.

This breaking change was introduced in the 3.0.0 release, a little over two
three ago. Since then, source connectors may have been modified to adapt to
the change in behavior by the Connect framework. As a result, we are
hesitant to go back to the prior logic of invoking SourceTask::stop on a
separate thread (see the linked Jira ticket for more detail on this front).

In https://github.com/apache/kafka/pull/14316, I proposed that we begin
interrupting the data processing thread for the source task after it had
exhausted its graceful shutdown timeout (i.e., when the Kafka Connect
runtime decides to cancel [3], [4], [5] the task). I believe this change is
fairly non-controversial--once a task has failed to shut down gracefully,
the runtime can and should do whatever it wants to force a shutdown,
graceful or otherwise.

With all that context out of the way, the question I'd like to ask is: do
we believe it's also appropriate to interrupt the data processing thread
when the task is scheduled for shutdown [6], [7]? This interruption would
ideally be followed up by a graceful shutdown of the task, which may
require the Kafka Connect runtime to handle a potential
InterruptedException from SourceTask::poll. Other exceptions (such as a
wrapped InterruptedException) would be impossible to handle gracefully, and
may lead to spurious error messages in the logs and failed final offset
commits for connectors that do not work well with this new behavior.

Finally, one important note: in the official documentation for
SourceTask::poll, we do already state that this method should not block for
too long:

> If no data is currently available, this method should block but return
control to the caller regularly (by returning null) in order for the task
to transition to the PAUSED state if requested to do so.

Looking forward to everyone's thoughts on this tricky issue!

Cheers,

Chris

[1] -
https://kafka.apache.org/36/javadoc/org/apache/kafka/connect/source/SourceTask.html#stop()
[2] -
https://kafka.apache.org/36/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()
[3] -
https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1037
[4] -
https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L129-L136
[5] -
https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L284-L297
[6] -
https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1014
[7] -
https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L112-L127


Do we have any mechanism to control requests per second for a Kafka connect sink?

2023-12-04 Thread Yeikel Santana
Hello everyone,



Is there any mechanism to force Kafka Connect to ingest at a given rate per 
second as opposed to tasks?



I am operating in a shared environment where the ingestion rate needs to be as 
low as possible (for example, 5 requests/second as an upper limit), and as far 
as I can tell, `tasks` are the main unit of work we can use. 



My current understanding is that a task will be blocked to process one batch, 
and it will continue to the next batch as soon as the previous request is 
completed. This should mean that if the target server can process the requests 
at a higher rate, then the sink will continue sending at that rate.



However, in my scenario, what I need is to send n requests per second and then 
sit idle until that time passes to avoid overloading the target server.



In this specific example, my best attempt to control the throughput was to 
configure it something like:





```json



"connector.class": 
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",



"tasks.max": "1",



"max.retries": "10",



"retry.backoff.ms": "1000",



"max.buffered.records": "100",



"batch.size": "100",



"max.in.flight.requests": "1",



"flush.synchronously": "true",



```



Unfortunately, while that helps, it does not solve the inherent problem. I also 
understand that this is very specific to the given Sink Connector, but my 
question is more about a global overwrite that could be applied if any.



As an alternative, I also suppose that I could add a `Thread.sleep` call as an 
SMT, or to fork ElasticsearchSinkConnector to introduce something similar, but 
that does not sound like a good solution.





Thank you!

RE: The Plan To Introduce Virtual Threads To Kafka Connect

2023-10-19 Thread miltan


Hi Team,
 
Greetings,
 
Apologies for the delay in reply as I was down with flu.
 
We actually reached out to you for IT/ SAP/ Oracle/ Infor / Microsoft "VOTEC
IT SERVICE PARTNERSHIP"  "IT SERVICE OUTSOURCING" " "PARTNER SERVICE
SUBCONTRACTING"
 
We have very attractive newly introduce reasonably price PARTNER IT SERVICE
ODC SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with
White Label Model.
 
Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee
payroll, Help partner to get profit more than 50% on each project.. ..We
really mean it.
 
We are already working with platinum partner like NTT DATA, NEC Singapore,
Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc.
 
Are u keen to understand VOTEC IT SERVICE MODEL PARTNERSHIP offerings?
 
Let us know your availability this week OR Next week?? We can arrange
discussion with Partner Manager.

-Original Message-
From: Boyee [mailto:zhenchua...@163.com] 
Sent: 14 October 2023 12:38
To: users@kafka.apache.org
Subject: The Plan To Introduce Virtual Threads To Kafka Connect

Kafka Connect as a kind of thread-intense program, can benifit a lot from
the usage of virtual threads.
>From JDK 21, released in last month, virtual threads is a formal feature of
JDK.
I would like to ask if any plans exist to bring virtual threads into Kafka
Connect.
Thank you.



RE: The Plan To Introduce Virtual Threads To Kafka Connect

2023-10-19 Thread miltan
Hi Team,
 
Greetings,
 
Apologies for the delay in reply as I was down with flu.
 
We actually reached out to you for IT/ SAP/ Oracle/ Infor / Microsoft “VOTEC IT 
SERVICE PARTNERSHIP”  “IT SERVICE OUTSOURCING” “ “PARTNER SERVICE 
SUBCONTRACTING”
 
We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC 
SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White 
Label Model.
 
Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee 
payroll, Help partner to get profit more than 50% on each project.. ..We really 
mean it.
 
We are already working with platinum partner like NTT DATA, NEC Singapore, 
Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc.
 
Are u keen to understand VOTEC IT SERVICE MODEL PARTNERSHIP offerings?
 
Let us know your availability this week OR Next week?? We can arrange 
discussion with Partner Manager.

-Original Message-
From: Greg Harris [mailto:greg.har...@aiven.io.INVALID] 
Sent: 16 October 2023 20:38
To: users@kafka.apache.org
Subject: Re: The Plan To Introduce Virtual Threads To Kafka Connect

Hi Boyee,

Thanks for the suggestion, Virtual threads look like they may be helpful for 
Connect, particularly in Connector plugins.

There are currently no plans to use virtual threads in the Connect framework, 
maybe because we need to maintain compatibility with JDK 8 until 4.0. See 
https://kafka.apache.org/36/documentation.html#java for supported Java versions.
I've opened a tracking ticket here:
https://issues.apache.org/jira/browse/KAFKA-15611 but I'll leave it unassigned 
for anyone interested to spec out the work. I also see a similar ticket for the 
Producer:
https://issues.apache.org/jira/browse/KAFKA-14606 .

Thanks!
Greg Harris

On Mon, Oct 16, 2023 at 6:20 AM Boyee  wrote:
>
> Kafka Connect as a kind of thread-intense program, can benifit a lot from the 
> usage of virtual threads.
> From JDK 21, released in last month, virtual threads is a formal feature of 
> JDK.
> I would like to ask if any plans exist to bring virtual threads into Kafka 
> Connect.
> Thank you.



Re: The Plan To Introduce Virtual Threads To Kafka Connect

2023-10-16 Thread Greg Harris
Hi Boyee,

Thanks for the suggestion, Virtual threads look like they may be
helpful for Connect, particularly in Connector plugins.

There are currently no plans to use virtual threads in the Connect
framework, maybe because we need to maintain compatibility with JDK 8
until 4.0. See https://kafka.apache.org/36/documentation.html#java for
supported Java versions.
I've opened a tracking ticket here:
https://issues.apache.org/jira/browse/KAFKA-15611 but I'll leave it
unassigned for anyone interested to spec out the work. I also see a
similar ticket for the Producer:
https://issues.apache.org/jira/browse/KAFKA-14606 .

Thanks!
Greg Harris

On Mon, Oct 16, 2023 at 6:20 AM Boyee  wrote:
>
> Kafka Connect as a kind of thread-intense program, can benifit a lot from the 
> usage of virtual threads.
> From JDK 21, released in last month, virtual threads is a formal feature of 
> JDK.
> I would like to ask if any plans exist to bring virtual threads into Kafka 
> Connect.
> Thank you.


The Plan To Introduce Virtual Threads To Kafka Connect

2023-10-16 Thread Boyee
Kafka Connect as a kind of thread-intense program, can benifit a lot from the 
usage of virtual threads.
From JDK 21, released in last month, virtual threads is a formal feature of JDK.
I would like to ask if any plans exist to bring virtual threads into Kafka 
Connect.
Thank you.

Re: Kafka Connect - Customize REST request headers

2023-10-07 Thread Yeikel Santana
Thank you for the explanation, Chris.



In case it helps, what I'm looking for is similar to KIP 577[1]. My specific 
example involves a hard-coded key/value pair that needs to be used for 
pod-to-pod as I can connect to any worker without that specific header, but 
workers cannot communicate among themselves without it.



To clarify, my environment is behind Istio[2], where Egress Traffic can be 
created using the following format: `..svc.cluster.local`.  For example, a request among workers should be:



curl -H "Host: ..svc.cluster.local" workerIP:PORT



Regarding temporary solutions, I've explored options like utilizing a proxy but 
I am running within containers that can complicate it further, along with the 
possibilities of patching, recompiling, or replacing the connect-runtime jar 
temporarily. I think that something like this might work but I need to test it 
: 





private static void addHeadersToRequest(HttpHeaders headers, Request req) {

 

 req.header("Host","..svc.cluster.local");

    

 if (headers != null) {

   

  

   String credentialAuthorization = 
headers.getHeaderString(HttpHeaders.AUTHORIZATION);

    if (credentialAuthorization != null) {

    req.header(HttpHeaders.AUTHORIZATION, credentialAuthorization);

    }

    }

    }





This is of course risky and it would be significantly more convenient if this 
functionality is integrated into Kafka Connect itself




[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP+577%3A+Allow+HTTP+Response+Headers+to+be+Configured+for+Kafka+Connect

[2] https://istio.io/





 On Sat, 07 Oct 2023 02:05:14 -0400 Chris Egerton  
wrote ---



Hi Yeikel, 
 
Neat question! And thanks for the link to the RestClient code; very helpful. 
 
I don't believe there's a way to configure Kafka Connect to add these 
headers to forwarded requests right now. You may be able to do some kind of 
out-of-band proxy magic to intercept forwarded requests and insert the 
proper headers there. 
 
I don't see a reason for Kafka Connect to only forward authorization 
headers, even after examining the PR [1] and corresponding Jira ticket [2] 
that altered the RestClient class to begin including authorization headers 
in forwarded REST requests. We may be able to tweak the RestClient to 
include all headers instead of just the authorization header. I know that 
this doesn't help your immediate situation, but if other committers and 
contributors agree that the change would be beneficial, we may be able to 
include it in the next release (which may be 3.7.0, or a patch release for 
3.4, 3.5, or 3.6). Alternatively, we may have to gate such a change behind 
a feature flag (either a coarse-grained boolean that enables/disables 
forwarding of all non-authorization headers, or more fine-grained logic 
such as include/exclude lists or even regexes), which would require a KIP 
and may take longer to release. 
 
I've CC'd the dev list to gather their perspective on this potential 
change, and to solicit their input on possible workarounds that may be 
useful to you sooner than the next release takes place. 
 
[1] - https://github.com/apache/kafka/pull/6791 
[2] - https://issues.apache.org/jira/browse/KAFKA-8404 
 
Cheers, 
 
Chris 
 
On Fri, Oct 6, 2023 at 10:14 PM Yeikel Santana <mailto:em...@yeikel.com> wrote: 
 
> Hello everyone, 
> 
> I'm currently running Kafka Connect behind a firewall that mandates the 
> inclusion of a specific header. This situation becomes particularly 
> challenging when forwarding requests among multiple workers, as it appears 
> that only the Authorization header is included in the request. 
> 
> I'm wondering if there's a way to customize the headers of Kafka Connect 
> before they are forwarded between workers. From my observations, it seems 
> that this capability may not be available[1], and only the response headers 
> can be customized. 
> 
> I'd appreciate any realistic alternatives or suggestions you may have in 
> mind. 
> 
> Thanks! 
> 
> 
> 
> 
> 
> 
> [1] 
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L191-L198

Re: Kafka Connect - Customize REST request headers

2023-10-07 Thread Chris Egerton
Hi Yeikel,

Neat question! And thanks for the link to the RestClient code; very helpful.

I don't believe there's a way to configure Kafka Connect to add these
headers to forwarded requests right now. You may be able to do some kind of
out-of-band proxy magic to intercept forwarded requests and insert the
proper headers there?

I don't see a reason for Kafka Connect to only forward authorization
headers, even after examining the PR [1] and corresponding Jira ticket [2]
that altered the RestClient class to begin including authorization headers
in forwarded REST requests. We may be able to tweak the RestClient to
include all headers instead of just the authorization header. I know that
this doesn't help your immediate situation, but if other committers and
contributors agree that the change would be beneficial, we may be able to
include it in the next release (which may be 3.7.0, or a patch release for
3.4, 3.5, or 3.6). Alternatively, we may have to gate such a change behind
a feature flag (either a coarse-grained boolean that enables/disables
forwarding of all non-authorization headers, or more fine-grained logic
such as include/exclude lists or even regexes), which would require a KIP
and may take longer to release.

I've CC'd the dev list to gather their perspective on this potential
change, and to solicit their input on possible workarounds that may be
useful to you sooner than the next release takes place.

[1] - https://github.com/apache/kafka/pull/6791
[2] - https://issues.apache.org/jira/browse/KAFKA-8404

Cheers,

Chris

On Fri, Oct 6, 2023 at 10:14 PM Yeikel Santana  wrote:

> Hello everyone,
>
> I'm currently running Kafka Connect behind a firewall that mandates the
> inclusion of a specific header. This situation becomes particularly
> challenging when forwarding requests among multiple workers, as it appears
> that only the Authorization header is included in the request.
>
> I'm wondering if there's a way to customize the headers of Kafka Connect
> before they are forwarded between workers. From my observations, it seems
> that this capability may not be available[1], and only the response headers
> can be customized.
>
> I'd appreciate any realistic alternatives or suggestions you may have in
> mind.
>
> Thanks!
>
>
>
>
>
>
> [1]
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L191-L198


Kafka Connect - Customize REST request headers

2023-10-06 Thread Yeikel Santana
Hello everyone,

I'm currently running Kafka Connect behind a firewall that mandates the 
inclusion of a specific header. This situation becomes particularly challenging 
when forwarding requests among multiple workers, as it appears that only the 
Authorization header is included in the request.

I'm wondering if there's a way to customize the headers of Kafka Connect before 
they are forwarded between workers. From my observations, it seems that this 
capability may not be available[1], and only the response headers can be 
customized.

I'd appreciate any realistic alternatives or suggestions you may have in mind.

Thanks! 






[1] 
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L191-L198

Re: Regarding Distributed Kafka-connect cluster

2023-09-26 Thread Yeikel Santana
Thank you, Yash. That additional documentation helps to further my 
understanding.

In case it helps in any way, I am currently setting the 
rest.advertised.host.name, and listener properties to a private IP address that 
is resolvable within each data center. However, each data center can only 
communicate with each other using a load balancer.

Is there any configuration I can set to help with this setup? 

For example, if the worker sends the request to the load balancer of the data 
center where the leader resides, I believe that it would work network-wise

Thank you again for taking the time to help.





 On Tue, 26 Sep 2023 07:44:06 -0400 Yash Mayya  wrote 
---



Hi Yeikel, 
 
> To clarify, who initiates the step that assigns a 
>  connector to a specific worker? If this process 
> is controlled by the leader, wouldn't it result in a 
> failure to assign tasks to workers with whom it 
> cannot communicate? 
 
This happens via the group rebalance process where each Kafka Connect 
worker communicates with the Kafka broker that has been chosen as the group 
co-ordinator for the Kafka Connect cluster. The assignment is indeed 
computed by the leader Connect worker but it is disseminated to the other 
Connect workers via the group coordinator [1]. 
 
> I should not find myself in a situation where a 
> connector is assigned to a worker who cannot 
> communicate with the leader 
 
This can unfortunately happen, since the assignments aren't done directly 
through leader -> non-leader Connect worker communication but via the Kafka 
broker designated as the group co-ordinator for the Connect cluster. 
 
[1] - 
https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2
 
 
On Tue, Sep 26, 2023 at 8:25 AM Yeikel Santana <mailto:em...@yeikel.com> wrote: 
 
> Thank you, Yash. Your explanation makes sense 
> 
> To clarify, who initiates the step that assigns a connector to a specific 
> worker? If this process is controlled by the leader, wouldn't it result in 
> a failure to assign tasks to workers with whom it cannot communicate? 
> 
> Although it is not ideal, it is acceptable for now if some workers remain 
> inactive as long as the data center where the leader resides remains active 
> and continues to handle task assignments. I should not find myself in a 
> situation where a connector is assigned to a worker who cannot communicate 
> with the leader as that would render it useless as you mentioned 
> 
> Thank you for taking the time 
> 
> 
> 
> 
> 
> 
> 
>  On Mon, 25 Sep 2023 11:41:18 -0400 Yash Mayya 
> <mailto:yash.ma...@gmail.com> 
> wrote --- 
> 
> 
> 
> Hi Yeikel, 
> 
> Heartbeats and group coordination in Kafka Connect do occur through Kafka, 
> but a Kafka Connect cluster where all workers cannot communicate with 
> each other won't work very well. You'll be able to create / update / 
> delete 
> connectors by making requests to any workers that can communicate with the 
> leader like you noted. However, certain internal operations require cross 
> Connect worker network access as well. For instance, after a connector is 
> started, it needs to spawn tasks that do the actual work. The tasks are 
> created via a POST request to the leader worker from the worker that is 
> running the connector. When you issue a create connector request to a 
> worker, a group rebalance ensues and the connector is assigned to a worker 
> in the cluster (it could be any worker, not necessarily the one to 
> which the request was issued). So if the connector that you created lands 
> on a Connect worker that cannot communicate with the leader worker, it 
> won't be able to create its tasks which will render the connector 
> essentially useless. 
> 
> Thanks, 
> Yash 
> 
> On Mon, Sep 25, 2023 at 7:51 PM Yeikel Santana 
> <mailto:mailto:em...@yeikel.com> 
> wrote: 
> 
> > Thank you, Nikhil. 
> > 
> > I did notice that challenge you're describing with the REST updates when 
> I 
> > had more than one worker within the same datacenter. 
> > 
> > Luckily, solving that was relatively simple as all my workers can 
> > communicate within the same data center, and all I need to do is to 
> ensure 
> > that the update is initiated from the same datacenter as the leader. 
> From 
> > what I tested so far, this seems to work fine. 
> > 
> > My biggest concern was regarding other operations such as heartbeats or 
> > general coordination. If that happens through Kafka, then I should be 
> > fine.Thank you for taking the time  On Mon, 25 Sep 2023 09:45:43 
> -0400 
> > mailto:mailto:nikhilsrivastava4...@gmail.com wrote Hi Yeikel, 
> > 
> &

Re: Regarding Distributed Kafka-connect cluster

2023-09-26 Thread Yash Mayya
Hi Yeikel,

> To clarify, who initiates the step that assigns a
>  connector to a specific worker? If this process
> is controlled by the leader, wouldn't it result in a
> failure to assign tasks to workers with whom it
> cannot communicate?

This happens via the group rebalance process where each Kafka Connect
worker communicates with the Kafka broker that has been chosen as the group
co-ordinator for the Kafka Connect cluster. The assignment is indeed
computed by the leader Connect worker but it is disseminated to the other
Connect workers via the group coordinator [1].

> I should not find myself in a situation where a
> connector is assigned to a worker who cannot
> communicate with the leader

This can unfortunately happen, since the assignments aren't done directly
through leader -> non-leader Connect worker communication but via the Kafka
broker designated as the group co-ordinator for the Connect cluster.

[1] -
https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2

On Tue, Sep 26, 2023 at 8:25 AM Yeikel Santana  wrote:

> Thank you, Yash. Your explanation makes sense
>
> To clarify, who initiates the step that assigns a connector to a specific
> worker? If this process is controlled by the leader, wouldn't it result in
> a failure to assign tasks to workers with whom it cannot communicate?
>
> Although it is not ideal, it is acceptable for now if some workers remain
> inactive as long as the data center where the leader resides remains active
> and continues to handle task assignments. I should not find myself in a
> situation where a connector is assigned to a worker who cannot communicate
> with the leader as that would render it useless as you mentioned
>
> Thank you for taking the time
>
>
>
>
>
>
>
>  On Mon, 25 Sep 2023 11:41:18 -0400 Yash Mayya 
> wrote ---
>
>
>
> Hi Yeikel,
>
> Heartbeats and group coordination in Kafka Connect do occur through Kafka,
> but a Kafka Connect cluster where all workers cannot communicate with
> each other won't work very well. You'll be able to create / update /
> delete
> connectors by making requests to any workers that can communicate with the
> leader like you noted. However, certain internal operations require cross
> Connect worker network access as well. For instance, after a connector is
> started, it needs to spawn tasks that do the actual work. The tasks are
> created via a POST request to the leader worker from the worker that is
> running the connector. When you issue a create connector request to a
> worker, a group rebalance ensues and the connector is assigned to a worker
> in the cluster (it could be any worker, not necessarily the one to
> which the request was issued). So if the connector that you created lands
> on a Connect worker that cannot communicate with the leader worker, it
> won't be able to create its tasks which will render the connector
> essentially useless.
>
> Thanks,
> Yash
>
> On Mon, Sep 25, 2023 at 7:51 PM Yeikel Santana <mailto:em...@yeikel.com>
> wrote:
>
> > Thank you, Nikhil.
> >
> > I did notice that challenge you're describing with the REST updates when
> I
> > had more than one worker within the same datacenter.
> >
> > Luckily, solving that was relatively simple as all my workers can
> > communicate within the same data center, and all I need to do is to
> ensure
> > that the update is initiated from the same datacenter as the leader.
> From
> > what I tested so far, this seems to work fine.
> >
> > My biggest concern was regarding other operations such as heartbeats or
> > general coordination. If that happens through Kafka, then I should be
> > fine.Thank you for taking the time  On Mon, 25 Sep 2023 09:45:43
> -0400
> > mailto:nikhilsrivastava4...@gmail.com wrote Hi Yeikel,
> >
> > Sharing my two cents. Would let others chime in to add to this.
> >
> > Based on my understanding, if connect workers (which are all part of the
> > same cluster) can communicate with the kafka brokers (which happens to
> be
> > the Group Coordinator and facilitates Connect Leader Election via Group
> > Membership Protocol), then only 1 connect worker will be elected as
> leader
> > amongst all others in the cluster. Outside of that, I believe a bunch of
> > REST calls to connect workers are forwarded to the connect leader (if
> the
> > REST request lands on a connect worker which isn't a leader). In case of
> a
> > non-retriable network partition between the non-leader worker and leader
> > worker, those REST requests will fail. I'm referring to REST requests
> like
> > CREATE / UP

Re: Regarding Distributed Kafka-connect cluster

2023-09-25 Thread Yeikel Santana
Thank you, Yash. Your explanation makes sense

To clarify, who initiates the step that assigns a connector to a specific 
worker? If this process is controlled by the leader, wouldn't it result in a 
failure to assign tasks to workers with whom it cannot communicate?

Although it is not ideal, it is acceptable for now if some workers remain 
inactive as long as the data center where the leader resides remains active and 
continues to handle task assignments. I should not find myself in a situation 
where a connector is assigned to a worker who cannot communicate with the 
leader as that would render it useless as you mentioned

Thank you for taking the time







 On Mon, 25 Sep 2023 11:41:18 -0400 Yash Mayya  wrote 
---



Hi Yeikel, 
 
Heartbeats and group coordination in Kafka Connect do occur through Kafka, 
but a Kafka Connect cluster where all workers cannot communicate with 
each other won't work very well. You'll be able to create / update / delete 
connectors by making requests to any workers that can communicate with the 
leader like you noted. However, certain internal operations require cross 
Connect worker network access as well. For instance, after a connector is 
started, it needs to spawn tasks that do the actual work. The tasks are 
created via a POST request to the leader worker from the worker that is 
running the connector. When you issue a create connector request to a 
worker, a group rebalance ensues and the connector is assigned to a worker 
in the cluster (it could be any worker, not necessarily the one to 
which the request was issued). So if the connector that you created lands 
on a Connect worker that cannot communicate with the leader worker, it 
won't be able to create its tasks which will render the connector 
essentially useless. 
 
Thanks, 
Yash 
 
On Mon, Sep 25, 2023 at 7:51 PM Yeikel Santana <mailto:em...@yeikel.com> wrote: 
 
> Thank you, Nikhil. 
> 
> I did notice that challenge you're describing with the REST updates when I 
> had more than one worker within the same datacenter. 
> 
> Luckily, solving that was relatively simple as all my workers can 
> communicate within the same data center, and all I need to do is to ensure 
> that the update is initiated from the same datacenter as the leader. From 
> what I tested so far, this seems to work fine. 
> 
> My biggest concern was regarding other operations such as heartbeats or 
> general coordination. If that happens through Kafka, then I should be 
> fine.Thank you for taking the time  On Mon, 25 Sep 2023 09:45:43 -0400 
> mailto:nikhilsrivastava4...@gmail.com wrote Hi Yeikel, 
> 
> Sharing my two cents. Would let others chime in to add to this. 
> 
> Based on my understanding, if connect workers (which are all part of the 
> same cluster) can communicate with the kafka brokers (which happens to be 
> the Group Coordinator and facilitates Connect Leader Election via Group 
> Membership Protocol), then only 1 connect worker will be elected as leader 
> amongst all others in the cluster. Outside of that, I believe a bunch of 
> REST calls to connect workers are forwarded to the connect leader (if the 
> REST request lands on a connect worker which isn't a leader). In case of a 
> non-retriable network partition between the non-leader worker and leader 
> worker, those REST requests will fail. I'm referring to REST requests like 
> CREATE / UPDATE / DELETE. 
> 
> Hope this helps a little. 
> 
> Thanks, 
> -Nikhil 
> 
> On Sun, 24 Sept 2023 at 06:36, Yeikel Santana <mailto:em...@yeikel.com> 
> wrote: 
> 
> > Hello everyone,I'm currently designing a new Kafka Connect cluster, and 
> > I'm trying to understand how connectivity functions among workers.In my 
> > setup, I have a single Kafka Connect cluster connected to the same Kafka 
> > topics and Kafka cluster. However, the workers are deployed in 
> > geographically separated data centers, each of which is fully isolated at 
> > the networkI suspect that this setup might not work with Kafka Connect 
> > because my current understanding is that ALL workers need to communicate 
> > with the leader for task coordination and heartbeats.In terms of leader 
> > election, can this result in multiple leaders and other potential 
> > issues?Any input and suggestions would be appreciated 
>

Re: Regarding Distributed Kafka-connect cluster

2023-09-25 Thread Yash Mayya
Hi Yeikel,

Heartbeats and group coordination in Kafka Connect do occur through Kafka,
but a Kafka Connect cluster where all workers cannot communicate with
each other won't work very well. You'll be able to create / update / delete
connectors by making requests to any workers that can communicate with the
leader like you noted. However, certain internal operations require cross
Connect worker network access as well. For instance, after a connector is
started, it needs to spawn tasks that do the actual work. The tasks are
created via a POST request to the leader worker from the worker that is
running the connector. When you issue a create connector request to a
worker, a group rebalance ensues and the connector is assigned to a worker
in the cluster (it could be any worker, not necessarily the one to
which the request was issued). So if the connector that you created lands
on a Connect worker that cannot communicate with the leader worker, it
won't be able to create its tasks which will render the connector
essentially useless.

Thanks,
Yash

On Mon, Sep 25, 2023 at 7:51 PM Yeikel Santana  wrote:

> Thank you, Nikhil.
>
> I did notice that challenge you're describing with the REST updates when I
> had more than one worker within the same datacenter.
>
> Luckily, solving that was relatively simple as all my workers can
> communicate within the same data center, and all I need to do is to ensure
> that the update is initiated from the same datacenter as the leader. From
> what I tested so far, this seems to work fine.
>
> My biggest concern was regarding other operations such as heartbeats or
> general coordination. If that happens through Kafka, then I should be
> fine.Thank you for taking the time  On Mon, 25 Sep 2023 09:45:43 -0400
> nikhilsrivastava4...@gmail.com  wrote Hi Yeikel,
>
> Sharing my two cents. Would let others chime in to add to this.
>
> Based on my understanding, if connect workers (which are all part of the
> same cluster) can communicate with the kafka brokers (which happens to be
> the Group Coordinator and facilitates Connect Leader Election via Group
> Membership Protocol), then only 1 connect worker will be elected as leader
> amongst all others in the cluster. Outside of that, I believe a bunch of
> REST calls to connect workers are forwarded to the connect leader (if the
> REST request lands on a connect worker which isn't a leader). In case of a
> non-retriable network partition between the non-leader worker and leader
> worker, those REST requests will fail. I'm referring to REST requests like
> CREATE / UPDATE / DELETE.
>
> Hope this helps a little.
>
> Thanks,
> -Nikhil
>
> On Sun, 24 Sept 2023 at 06:36, Yeikel Santana  wrote:
>
> > Hello everyone,I'm currently designing a new Kafka Connect cluster, and
> > I'm trying to understand how connectivity functions among workers.In my
> > setup, I have a single Kafka Connect cluster connected to the same Kafka
> > topics and Kafka cluster. However, the workers are deployed in
> > geographically separated data centers, each of which is fully isolated at
> > the networkI suspect that this setup might not work with Kafka Connect
> > because my current understanding is that ALL workers need to communicate
> > with the leader for task coordination and heartbeats.In terms of leader
> > election, can this result in multiple leaders and other potential
> > issues?Any input and suggestions would be appreciated
>


Re: Regarding Distributed Kafka-connect cluster

2023-09-25 Thread Yeikel Santana
Thank you, Nikhil.

I did notice that challenge you're describing with the REST updates when I had 
more than one worker within the same datacenter.

Luckily, solving that was relatively simple as all my workers can communicate 
within the same data center, and all I need to do is to ensure that the update 
is initiated from the same datacenter as the leader. From what I tested so far, 
this seems to work fine.

My biggest concern was regarding other operations such as heartbeats or general 
coordination. If that happens through Kafka, then I should be fine.Thank you 
for taking the time  On Mon, 25 Sep 2023 09:45:43 -0400  
nikhilsrivastava4...@gmail.com  wrote Hi Yeikel,

Sharing my two cents. Would let others chime in to add to this.

Based on my understanding, if connect workers (which are all part of the
same cluster) can communicate with the kafka brokers (which happens to be
the Group Coordinator and facilitates Connect Leader Election via Group
Membership Protocol), then only 1 connect worker will be elected as leader
amongst all others in the cluster. Outside of that, I believe a bunch of
REST calls to connect workers are forwarded to the connect leader (if the
REST request lands on a connect worker which isn't a leader). In case of a
non-retriable network partition between the non-leader worker and leader
worker, those REST requests will fail. I'm referring to REST requests like
CREATE / UPDATE / DELETE.

Hope this helps a little.

Thanks,
-Nikhil

On Sun, 24 Sept 2023 at 06:36, Yeikel Santana  wrote:

> Hello everyone,I'm currently designing a new Kafka Connect cluster, and
> I'm trying to understand how connectivity functions among workers.In my
> setup, I have a single Kafka Connect cluster connected to the same Kafka
> topics and Kafka cluster. However, the workers are deployed in
> geographically separated data centers, each of which is fully isolated at
> the networkI suspect that this setup might not work with Kafka Connect
> because my current understanding is that ALL workers need to communicate
> with the leader for task coordination and heartbeats.In terms of leader
> election, can this result in multiple leaders and other potential
> issues?Any input and suggestions would be appreciated


Re: Regarding Distributed Kafka-connect cluster

2023-09-25 Thread sunil chaudhari
Kudos to Nikhil.
your explanation adds to my knowledge.



On Mon, 25 Sep 2023 at 7:16 PM, Nikhil Srivastava <
nikhilsrivastava4...@gmail.com> wrote:

> Hi Yeikel,
>
> Sharing my two cents. Would let others chime in to add to this.
>
> Based on my understanding, if connect workers (which are all part of the
> same cluster) can communicate with the kafka brokers (which happens to be
> the Group Coordinator and facilitates Connect Leader Election via Group
> Membership Protocol), then only 1 connect worker will be elected as leader
> amongst all others in the cluster. Outside of that, I believe a bunch of
> REST calls to connect workers are forwarded to the connect leader (if the
> REST request lands on a connect worker which isn't a leader). In case of a
> non-retriable network partition between the non-leader worker and leader
> worker, those REST requests will fail. I'm referring to REST requests like
> CREATE / UPDATE / DELETE.
>
> Hope this helps a little.
>
> Thanks,
> -Nikhil
>
> On Sun, 24 Sept 2023 at 06:36, Yeikel Santana  wrote:
>
> > Hello everyone,I'm currently designing a new Kafka Connect cluster, and
> > I'm trying to understand how connectivity functions among workers.In my
> > setup, I have a single Kafka Connect cluster connected to the same Kafka
> > topics and Kafka cluster. However, the workers are deployed in
> > geographically separated data centers, each of which is fully isolated at
> > the networkI suspect that this setup might not work with Kafka Connect
> > because my current understanding is that ALL workers need to communicate
> > with the leader for task coordination and heartbeats.In terms of leader
> > election, can this result in multiple leaders and other potential
> > issues?Any input and suggestions would be appreciated
>


Re: Regarding Distributed Kafka-connect cluster

2023-09-25 Thread Nikhil Srivastava
Hi Yeikel,

Sharing my two cents. Would let others chime in to add to this.

Based on my understanding, if connect workers (which are all part of the
same cluster) can communicate with the kafka brokers (which happens to be
the Group Coordinator and facilitates Connect Leader Election via Group
Membership Protocol), then only 1 connect worker will be elected as leader
amongst all others in the cluster. Outside of that, I believe a bunch of
REST calls to connect workers are forwarded to the connect leader (if the
REST request lands on a connect worker which isn't a leader). In case of a
non-retriable network partition between the non-leader worker and leader
worker, those REST requests will fail. I'm referring to REST requests like
CREATE / UPDATE / DELETE.

Hope this helps a little.

Thanks,
-Nikhil

On Sun, 24 Sept 2023 at 06:36, Yeikel Santana  wrote:

> Hello everyone,I'm currently designing a new Kafka Connect cluster, and
> I'm trying to understand how connectivity functions among workers.In my
> setup, I have a single Kafka Connect cluster connected to the same Kafka
> topics and Kafka cluster. However, the workers are deployed in
> geographically separated data centers, each of which is fully isolated at
> the networkI suspect that this setup might not work with Kafka Connect
> because my current understanding is that ALL workers need to communicate
> with the leader for task coordination and heartbeats.In terms of leader
> election, can this result in multiple leaders and other potential
> issues?Any input and suggestions would be appreciated


Regarding Distributed Kafka-connect cluster

2023-09-23 Thread Yeikel Santana
Hello everyone,I'm currently designing a new Kafka Connect cluster, and I'm 
trying to understand how connectivity functions among workers.In my setup, I 
have a single Kafka Connect cluster connected to the same Kafka topics and 
Kafka cluster. However, the workers are deployed in geographically separated 
data centers, each of which is fully isolated at the networkI suspect that this 
setup might not work with Kafka Connect because my current understanding is 
that ALL workers need to communicate with the leader for task coordination and 
heartbeats.In terms of leader election, can this result in multiple leaders and 
other potential issues?Any input and suggestions would be appreciated

Re: Kafka connect Graceful stop of task failed

2023-08-21 Thread Robson Hermes
Hello Greg

Thanks a *lot* for your help on this.

Indeed the empty poll is not the issue for us. As mentioned, our setup is a
poll every 24 hours. So the `stop()` being stuck due to the `poll()` is
hitting us hard.
I did a trace today on my dev environment, I can indeed see this waiting
log entry every 100 ms
<https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L427>.
Then I call a DEL on this connector, and the stop is not processed until
the next loop in the `poll()`.
Your initial diagnosis is 100% correct.

You mentioned other connectors already changed to support a stop signal
from the same thread. Would u have any concrete connector impl to point me?

Kind regards
Robson

On Mon, 21 Aug 2023 at 19:23, Greg Harris 
wrote:

> Hey Robson,
>
> Thanks for opening an issue on the JDBC repo, I think this is
> certainly relevant feedback for the connector developers. I commented
> on the issue with a potential regression that I saw, you can try
> downgrading your connector to see if the behavior improves.
> I also know that kafka-connect-jdbc received a patch to improve this
> behavior when no data is being emitted:
> https://github.com/confluentinc/kafka-connect-jdbc/pull/947 but I'm
> not sure if that is relevant to your situation.
>
> Thanks!
> Greg
>
> On Mon, Aug 21, 2023 at 6:53 AM Robson Hermes 
> wrote:
> >
> > No, it stops them also.
> > The problem is precisely what Greg described, now the stop signal comes
> > from the same thread. So any source task which is running in a blocking
> way
> > will not process the stop signal until the current poll finishes.
> > So would need to patch source jdbc connector.
> >
> > On Mon, 21 Aug 2023 at 15:48, sunil chaudhari <
> sunilmchaudhar...@gmail.com>
> > wrote:
> >
> > > I think when you delete connector it removes the task and workers
> continues
> > > to run.
> > > When you stop it actually stops the worker.
> > > Both different things.
> > > Point to be noted is Worker has connector.
> > > So connector should be removed before stopping the worker.
> > >
> > > Though I am not expert in this.
> > >
> > > On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes 
> > > wrote:
> > >
> > > > Hello Sunil
> > > >
> > > > I'm not calling a stop, I'm straight deleting the connectors with the
> > > > DELETE. Stopping the connector is done internally during deletion.
> > > >
> > > > Regards
> > > > Robson
> > > >
> > > > On Mon, 21 Aug 2023 at 15:36, sunil chaudhari <
> > > sunilmchaudhar...@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > You have to remove connectors first using delete api
> > > > > and then stop the connector
> > > > >
> > > > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes <
> robsonher...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hello
> > > > > >
> > > > > > I'm using kafka connect 7.4.0 to read data from Postgres views
> and
> > > > write
> > > > > to
> > > > > > another Postgres tables. So using JDBC source and sink
> connectors.
> > > > > > All works good, but whenever I stop the source connectors via the
> > > rest
> > > > > api:
> > > > > >
> > > > > > DEL http://kafka-connect:8083/connectors/connector_name_here
> > > > > >
> > > > > > The connector stops fine, but not the task:
> > > > > >
> > > > > >
> > > > > > Graceful stop of connector (connector-name-here) succeeded.
> > > > > >
> > > > > > Graceful stop of task (task-name-here) failed.
> > > > > >
> > > > > >
> > > > > > It only happens with the *source* connector tasks. The sink
> connector
> > > > > > and tasks shutdown gracefully and fine.
> > > > > >
> > > > > > The timeout for task shutdown has been increased, but didn't
> help:
> > > > > >
> > > > > > task.shutdown.graceful.timeout.ms=6
> > > > > >
> > > > > >
> > > > > >
> > > > > > The connectors are running once per day (during the night) to
> load a
> > > > > > lot of data, and the error happens when I try to delete the
> > > connectors
> > > > > > in the middle of the day. That is, they are not actually
> > > > > > executing/loading any data, it has finished already.
> > > > > >
> > > > > > offset.flush.interval.ms=1 in development and integration
> > > > > > environments.
> > > > > >
> > > > > >  offset.flush.interval.ms=6 in production and uat.
> > > > > >
> > > > > >
> > > > > > The rest of the config is pretty much the default.
> > > > > >
> > > > > > What could be the issue?
> > > > > >
> > > > > > The errors of the graceful stop of the tasks are triggering our
> alert
> > > > > > system, so trying to get rid of those.
> > > > > >
> > > > > >
> > > > > > Thanks a lot
> > > > > >
> > > > > > Robson
> > > > > >
> > > > >
> > > >
> > >
>


Re: Kafka connect Graceful stop of task failed

2023-08-21 Thread Greg Harris
Hey Robson,

Thanks for opening an issue on the JDBC repo, I think this is
certainly relevant feedback for the connector developers. I commented
on the issue with a potential regression that I saw, you can try
downgrading your connector to see if the behavior improves.
I also know that kafka-connect-jdbc received a patch to improve this
behavior when no data is being emitted:
https://github.com/confluentinc/kafka-connect-jdbc/pull/947 but I'm
not sure if that is relevant to your situation.

Thanks!
Greg

On Mon, Aug 21, 2023 at 6:53 AM Robson Hermes  wrote:
>
> No, it stops them also.
> The problem is precisely what Greg described, now the stop signal comes
> from the same thread. So any source task which is running in a blocking way
> will not process the stop signal until the current poll finishes.
> So would need to patch source jdbc connector.
>
> On Mon, 21 Aug 2023 at 15:48, sunil chaudhari 
> wrote:
>
> > I think when you delete connector it removes the task and workers continues
> > to run.
> > When you stop it actually stops the worker.
> > Both different things.
> > Point to be noted is Worker has connector.
> > So connector should be removed before stopping the worker.
> >
> > Though I am not expert in this.
> >
> > On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes 
> > wrote:
> >
> > > Hello Sunil
> > >
> > > I'm not calling a stop, I'm straight deleting the connectors with the
> > > DELETE. Stopping the connector is done internally during deletion.
> > >
> > > Regards
> > > Robson
> > >
> > > On Mon, 21 Aug 2023 at 15:36, sunil chaudhari <
> > sunilmchaudhar...@gmail.com
> > > >
> > > wrote:
> > >
> > > > You have to remove connectors first using delete api
> > > > and then stop the connector
> > > >
> > > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes 
> > > > wrote:
> > > >
> > > > > Hello
> > > > >
> > > > > I'm using kafka connect 7.4.0 to read data from Postgres views and
> > > write
> > > > to
> > > > > another Postgres tables. So using JDBC source and sink connectors.
> > > > > All works good, but whenever I stop the source connectors via the
> > rest
> > > > api:
> > > > >
> > > > > DEL http://kafka-connect:8083/connectors/connector_name_here
> > > > >
> > > > > The connector stops fine, but not the task:
> > > > >
> > > > >
> > > > > Graceful stop of connector (connector-name-here) succeeded.
> > > > >
> > > > > Graceful stop of task (task-name-here) failed.
> > > > >
> > > > >
> > > > > It only happens with the *source* connector tasks. The sink connector
> > > > > and tasks shutdown gracefully and fine.
> > > > >
> > > > > The timeout for task shutdown has been increased, but didn't help:
> > > > >
> > > > > task.shutdown.graceful.timeout.ms=6
> > > > >
> > > > >
> > > > >
> > > > > The connectors are running once per day (during the night) to load a
> > > > > lot of data, and the error happens when I try to delete the
> > connectors
> > > > > in the middle of the day. That is, they are not actually
> > > > > executing/loading any data, it has finished already.
> > > > >
> > > > > offset.flush.interval.ms=1 in development and integration
> > > > > environments.
> > > > >
> > > > >  offset.flush.interval.ms=6 in production and uat.
> > > > >
> > > > >
> > > > > The rest of the config is pretty much the default.
> > > > >
> > > > > What could be the issue?
> > > > >
> > > > > The errors of the graceful stop of the tasks are triggering our alert
> > > > > system, so trying to get rid of those.
> > > > >
> > > > >
> > > > > Thanks a lot
> > > > >
> > > > > Robson
> > > > >
> > > >
> > >
> >


Re: Kafka connect Graceful stop of task failed

2023-08-21 Thread Lemi Odidi
How can I stop getting these updates ?

On Mon, Aug 21, 2023 at 9:01 AM Robson Hermes 
wrote:

> This email was sent from an external source so please treat with caution.
>
> No, it stops them also.
> The problem is precisely what Greg described, now the stop signal comes
> from the same thread. So any source task which is running in a blocking way
> will not process the stop signal until the current poll finishes.
> So would need to patch source jdbc connector.
>
> On Mon, 21 Aug 2023 at 15:48, sunil chaudhari  >
> wrote:
>
> > I think when you delete connector it removes the task and workers
> continues
> > to run.
> > When you stop it actually stops the worker.
> > Both different things.
> > Point to be noted is Worker has connector.
> > So connector should be removed before stopping the worker.
> >
> > Though I am not expert in this.
> >
> > On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes 
> > wrote:
> >
> > > Hello Sunil
> > >
> > > I'm not calling a stop, I'm straight deleting the connectors with the
> > > DELETE. Stopping the connector is done internally during deletion.
> > >
> > > Regards
> > > Robson
> > >
> > > On Mon, 21 Aug 2023 at 15:36, sunil chaudhari <
> > sunilmchaudhar...@gmail.com
> > > >
> > > wrote:
> > >
> > > > You have to remove connectors first using delete api
> > > > and then stop the connector
> > > >
> > > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes <
> robsonher...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello
> > > > >
> > > > > I'm using kafka connect 7.4.0 to read data from Postgres views and
> > > write
> > > > to
> > > > > another Postgres tables. So using JDBC source and sink connectors.
> > > > > All works good, but whenever I stop the source connectors via the
> > rest
> > > > api:
> > > > >
> > > > > DEL http://kafka-connect:8083/connectors/connector_name_here
> > > > >
> > > > > The connector stops fine, but not the task:
> > > > >
> > > > >
> > > > > Graceful stop of connector (connector-name-here) succeeded.
> > > > >
> > > > > Graceful stop of task (task-name-here) failed.
> > > > >
> > > > >
> > > > > It only happens with the *source* connector tasks. The sink
> connector
> > > > > and tasks shutdown gracefully and fine.
> > > > >
> > > > > The timeout for task shutdown has been increased, but didn't help:
> > > > >
> > > > > task.shutdown.graceful.timeout.ms=6
> > > > >
> > > > >
> > > > >
> > > > > The connectors are running once per day (during the night) to load
> a
> > > > > lot of data, and the error happens when I try to delete the
> > connectors
> > > > > in the middle of the day. That is, they are not actually
> > > > > executing/loading any data, it has finished already.
> > > > >
> > > > > offset.flush.interval.ms=1 in development and integration
> > > > > environments.
> > > > >
> > > > >  offset.flush.interval.ms=6 in production and uat.
> > > > >
> > > > >
> > > > > The rest of the config is pretty much the default.
> > > > >
> > > > > What could be the issue?
> > > > >
> > > > > The errors of the graceful stop of the tasks are triggering our
> alert
> > > > > system, so trying to get rid of those.
> > > > >
> > > > >
> > > > > Thanks a lot
> > > > >
> > > > > Robson
> > > > >
> > > >
> > >
> >
>

About Ascential plc
Ascential delivers specialist information, analytics and ecommerce optimisation 
platforms to the world's leading consumer brands and their ecosystems.
Our world-class businesses improve performance and solve problems for our 
customers by delivering immediately actionable information combined with 
visionary longer-term thinking across Digital Commerce, Product Design and 
Marketing. We also serve customers across Retail & Financial Services.
With over 3,000 employees across five continents, we combine local expertise 
with a global footprint for clients in over 120 countries.
Ascential is listed on the London Stock Exchange.
The information in or attached to this email is confidential and may be legally 
privileged. If you are not the intended recipient of this message, any use, 
disclosure, copying, distribution or any action taken in reliance on it is 
prohibited and may be unlawful. 
If you have received this message in error, please notify the sender 
immediately by return email and delete this message and any copies from your 
computer and network. Ascential does not warrant that this email and any 
attachments are free from viruses and accepts no liability for any loss 
resulting from infected email transmissions.
Ascential reserves the right to monitor all email through its networks. Any 
view expressed may be those of the originator and not necessarily of Ascential 
plc. Please be advised all phone calls may be recorded for training and quality 
purposes and by accepting and/or making calls to us you acknowledge and agree 
to calls being recorded.
Ascential plc, number 9934451 (England and Wales). Registered Office: 33 
Kingsway, London, WC2B 6UF.


Re: Kafka connect Graceful stop of task failed

2023-08-21 Thread Robson Hermes
No, it stops them also.
The problem is precisely what Greg described, now the stop signal comes
from the same thread. So any source task which is running in a blocking way
will not process the stop signal until the current poll finishes.
So would need to patch source jdbc connector.

On Mon, 21 Aug 2023 at 15:48, sunil chaudhari 
wrote:

> I think when you delete connector it removes the task and workers continues
> to run.
> When you stop it actually stops the worker.
> Both different things.
> Point to be noted is Worker has connector.
> So connector should be removed before stopping the worker.
>
> Though I am not expert in this.
>
> On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes 
> wrote:
>
> > Hello Sunil
> >
> > I'm not calling a stop, I'm straight deleting the connectors with the
> > DELETE. Stopping the connector is done internally during deletion.
> >
> > Regards
> > Robson
> >
> > On Mon, 21 Aug 2023 at 15:36, sunil chaudhari <
> sunilmchaudhar...@gmail.com
> > >
> > wrote:
> >
> > > You have to remove connectors first using delete api
> > > and then stop the connector
> > >
> > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes 
> > > wrote:
> > >
> > > > Hello
> > > >
> > > > I'm using kafka connect 7.4.0 to read data from Postgres views and
> > write
> > > to
> > > > another Postgres tables. So using JDBC source and sink connectors.
> > > > All works good, but whenever I stop the source connectors via the
> rest
> > > api:
> > > >
> > > > DEL http://kafka-connect:8083/connectors/connector_name_here
> > > >
> > > > The connector stops fine, but not the task:
> > > >
> > > >
> > > > Graceful stop of connector (connector-name-here) succeeded.
> > > >
> > > > Graceful stop of task (task-name-here) failed.
> > > >
> > > >
> > > > It only happens with the *source* connector tasks. The sink connector
> > > > and tasks shutdown gracefully and fine.
> > > >
> > > > The timeout for task shutdown has been increased, but didn't help:
> > > >
> > > > task.shutdown.graceful.timeout.ms=6
> > > >
> > > >
> > > >
> > > > The connectors are running once per day (during the night) to load a
> > > > lot of data, and the error happens when I try to delete the
> connectors
> > > > in the middle of the day. That is, they are not actually
> > > > executing/loading any data, it has finished already.
> > > >
> > > > offset.flush.interval.ms=1 in development and integration
> > > > environments.
> > > >
> > > >  offset.flush.interval.ms=6 in production and uat.
> > > >
> > > >
> > > > The rest of the config is pretty much the default.
> > > >
> > > > What could be the issue?
> > > >
> > > > The errors of the graceful stop of the tasks are triggering our alert
> > > > system, so trying to get rid of those.
> > > >
> > > >
> > > > Thanks a lot
> > > >
> > > > Robson
> > > >
> > >
> >
>


Re: Kafka connect Graceful stop of task failed

2023-08-21 Thread sunil chaudhari
I think when you delete connector it removes the task and workers continues
to run.
When you stop it actually stops the worker.
Both different things.
Point to be noted is Worker has connector.
So connector should be removed before stopping the worker.

Though I am not expert in this.

On Mon, 21 Aug 2023 at 7:10 PM, Robson Hermes 
wrote:

> Hello Sunil
>
> I'm not calling a stop, I'm straight deleting the connectors with the
> DELETE. Stopping the connector is done internally during deletion.
>
> Regards
> Robson
>
> On Mon, 21 Aug 2023 at 15:36, sunil chaudhari  >
> wrote:
>
> > You have to remove connectors first using delete api
> > and then stop the connector
> >
> > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes 
> > wrote:
> >
> > > Hello
> > >
> > > I'm using kafka connect 7.4.0 to read data from Postgres views and
> write
> > to
> > > another Postgres tables. So using JDBC source and sink connectors.
> > > All works good, but whenever I stop the source connectors via the rest
> > api:
> > >
> > > DEL http://kafka-connect:8083/connectors/connector_name_here
> > >
> > > The connector stops fine, but not the task:
> > >
> > >
> > > Graceful stop of connector (connector-name-here) succeeded.
> > >
> > > Graceful stop of task (task-name-here) failed.
> > >
> > >
> > > It only happens with the *source* connector tasks. The sink connector
> > > and tasks shutdown gracefully and fine.
> > >
> > > The timeout for task shutdown has been increased, but didn't help:
> > >
> > > task.shutdown.graceful.timeout.ms=6
> > >
> > >
> > >
> > > The connectors are running once per day (during the night) to load a
> > > lot of data, and the error happens when I try to delete the connectors
> > > in the middle of the day. That is, they are not actually
> > > executing/loading any data, it has finished already.
> > >
> > > offset.flush.interval.ms=1 in development and integration
> > > environments.
> > >
> > >  offset.flush.interval.ms=6 in production and uat.
> > >
> > >
> > > The rest of the config is pretty much the default.
> > >
> > > What could be the issue?
> > >
> > > The errors of the graceful stop of the tasks are triggering our alert
> > > system, so trying to get rid of those.
> > >
> > >
> > > Thanks a lot
> > >
> > > Robson
> > >
> >
>


Re: Kafka connect Graceful stop of task failed

2023-08-21 Thread Robson Hermes
Hello Sunil

I'm not calling a stop, I'm straight deleting the connectors with the
DELETE. Stopping the connector is done internally during deletion.

Regards
Robson

On Mon, 21 Aug 2023 at 15:36, sunil chaudhari 
wrote:

> You have to remove connectors first using delete api
> and then stop the connector
>
> On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes 
> wrote:
>
> > Hello
> >
> > I'm using kafka connect 7.4.0 to read data from Postgres views and write
> to
> > another Postgres tables. So using JDBC source and sink connectors.
> > All works good, but whenever I stop the source connectors via the rest
> api:
> >
> > DEL http://kafka-connect:8083/connectors/connector_name_here
> >
> > The connector stops fine, but not the task:
> >
> >
> > Graceful stop of connector (connector-name-here) succeeded.
> >
> > Graceful stop of task (task-name-here) failed.
> >
> >
> > It only happens with the *source* connector tasks. The sink connector
> > and tasks shutdown gracefully and fine.
> >
> > The timeout for task shutdown has been increased, but didn't help:
> >
> > task.shutdown.graceful.timeout.ms=6
> >
> >
> >
> > The connectors are running once per day (during the night) to load a
> > lot of data, and the error happens when I try to delete the connectors
> > in the middle of the day. That is, they are not actually
> > executing/loading any data, it has finished already.
> >
> > offset.flush.interval.ms=1 in development and integration
> > environments.
> >
> >  offset.flush.interval.ms=6 in production and uat.
> >
> >
> > The rest of the config is pretty much the default.
> >
> > What could be the issue?
> >
> > The errors of the graceful stop of the tasks are triggering our alert
> > system, so trying to get rid of those.
> >
> >
> > Thanks a lot
> >
> > Robson
> >
>


Re: Kafka connect Graceful stop of task failed

2023-08-21 Thread sunil chaudhari
You have to remove connectors first using delete api
and then stop the connector

On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes 
wrote:

> Hello
>
> I'm using kafka connect 7.4.0 to read data from Postgres views and write to
> another Postgres tables. So using JDBC source and sink connectors.
> All works good, but whenever I stop the source connectors via the rest api:
>
> DEL http://kafka-connect:8083/connectors/connector_name_here
>
> The connector stops fine, but not the task:
>
>
> Graceful stop of connector (connector-name-here) succeeded.
>
> Graceful stop of task (task-name-here) failed.
>
>
> It only happens with the *source* connector tasks. The sink connector
> and tasks shutdown gracefully and fine.
>
> The timeout for task shutdown has been increased, but didn't help:
>
> task.shutdown.graceful.timeout.ms=6
>
>
>
> The connectors are running once per day (during the night) to load a
> lot of data, and the error happens when I try to delete the connectors
> in the middle of the day. That is, they are not actually
> executing/loading any data, it has finished already.
>
> offset.flush.interval.ms=1 in development and integration
> environments.
>
>  offset.flush.interval.ms=6 in production and uat.
>
>
> The rest of the config is pretty much the default.
>
> What could be the issue?
>
> The errors of the graceful stop of the tasks are triggering our alert
> system, so trying to get rid of those.
>
>
> Thanks a lot
>
> Robson
>


Re: Kafka connect Graceful stop of task failed

2023-08-21 Thread Robson Hermes
Hello Greg (sorry about the duplicate e-mail, forgot to cc users mailing
list)

Thanks a lot for your detailed reply. I'm using JDBC Source connectors from
kafka-connect-jdbc <https://github.com/confluentinc/kafka-connect-jdbc>.
Indeed the `poll()` implementation is blocked, so it only processes a
`stop()` when it returns from the current `poll()`execution.
There was a change in the past to fix a similar problem
<https://github.com/confluentinc/kafka-connect-jdbc/pull/677>, but not
involving `stop()` from the same thread. I've just raised one
<https://github.com/confluentinc/kafka-connect-jdbc/issues/1360>.

Unfortunately setting a lower poll interval is not an option for me, as
this is being used in a heavy data load operation, being executed only once
per day.

Will see if I'm able to come up with a change, although not sure yet. First
time using kafka, kafka connect and kafka connect jdbc =D

Kind regards
Robson

On Thu, 17 Aug 2023 at 00:37, Greg Harris 
wrote:

> Hi Robson,
>
> Thank you for the detailed bug report.
>
> I believe the behavior that you're describing is caused by this flaw:
> https://issues.apache.org/jira/browse/KAFKA-15090 which is still under
> discussion. Since the above flaw was introduced in 3.0, source
> connectors need to return from poll() before the graceful shutdown
> timeout to avoid the error.
> You may be able to mitigate the error if the connector allows you to
> reduce its poll timeout/interval to something less than the graceful
> timeout, but that will depend on the specific connector
> implementation, so check the documentation for your connector. I know
> some implementations have received patches to compensate for this
> behavior in the framework, so also consider upgrading or checking
> release notes for your connectors.
>
> As for the effects of this error: whenever a non-graceful stop occurs,
> the runtime will immediately close the producer so that the task will
> not be able to write any further records. However, it will still leave
> resources for that task (threads, in-memory records, database
> connections, etc) allocated, until the task does finally return from
> poll(). While this is not desirable behavior, it seems to be treated
> as just a nuisance error by most operators.
>
> I hope this gives some context for the error message you're seeing.
>
> Thanks,
> Greg
>


Re: Kafka connect Graceful stop of task failed

2023-08-16 Thread Greg Harris
Hi Robson,

Thank you for the detailed bug report.

I believe the behavior that you're describing is caused by this flaw:
https://issues.apache.org/jira/browse/KAFKA-15090 which is still under
discussion. Since the above flaw was introduced in 3.0, source
connectors need to return from poll() before the graceful shutdown
timeout to avoid the error.
You may be able to mitigate the error if the connector allows you to
reduce its poll timeout/interval to something less than the graceful
timeout, but that will depend on the specific connector
implementation, so check the documentation for your connector. I know
some implementations have received patches to compensate for this
behavior in the framework, so also consider upgrading or checking
release notes for your connectors.

As for the effects of this error: whenever a non-graceful stop occurs,
the runtime will immediately close the producer so that the task will
not be able to write any further records. However, it will still leave
resources for that task (threads, in-memory records, database
connections, etc) allocated, until the task does finally return from
poll(). While this is not desirable behavior, it seems to be treated
as just a nuisance error by most operators.

I hope this gives some context for the error message you're seeing.

Thanks,
Greg


Kafka connect Graceful stop of task failed

2023-08-16 Thread Robson Hermes
Hello

I'm using kafka connect 7.4.0 to read data from Postgres views and write to
another Postgres tables. So using JDBC source and sink connectors.
All works good, but whenever I stop the source connectors via the rest api:

DEL http://kafka-connect:8083/connectors/connector_name_here

The connector stops fine, but not the task:


Graceful stop of connector (connector-name-here) succeeded.

Graceful stop of task (task-name-here) failed.


It only happens with the *source* connector tasks. The sink connector
and tasks shutdown gracefully and fine.

The timeout for task shutdown has been increased, but didn't help:

task.shutdown.graceful.timeout.ms=6



The connectors are running once per day (during the night) to load a
lot of data, and the error happens when I try to delete the connectors
in the middle of the day. That is, they are not actually
executing/loading any data, it has finished already.

offset.flush.interval.ms=1 in development and integration environments.

 offset.flush.interval.ms=6 in production and uat.


The rest of the config is pretty much the default.

What could be the issue?

The errors of the graceful stop of the tasks are triggering our alert
system, so trying to get rid of those.


Thanks a lot

Robson


RE: Kafka Connect Rest Extension Question

2023-08-01 Thread miltan
Hi Team,

Greetings,

We actually reached out to you for Oracle/ IT / SAP / Infor / Microsoft "VOTEC 
IT SERVICE PARTNERSHIP"  "IT SERVICE OUTSOURCING" " "PARTNER SERVICE 
SUBCONTRACTING"

We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC 
SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White 
Label Model.

Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee 
payroll, Help partner to get profit more than 50% on each project.. ..We really 
mean it.

We are already working with platinum partner like NTT DATA, NEC Singapore, 
Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc.

Are u keen to understand VOTEC IT PARTNERSHIP offerings? Looping KB 
kail...@votecgroup.com | Partnership In charge |

Let us know your availability this week OR Next week??

-Original Message-
From: mil...@votecgroup.com [mailto:mil...@votecgroup.com] 
Sent: 01 August 2023 11:56
To: users@kafka.apache.org
Subject: RE: Kafka Connect Rest Extension Question

Hi Team,

Greetings,

We actually reached out to you for Oracle/ IT / SAP / Infor / Microsoft "VOTEC 
IT SERVICE PARTNERSHIP"  "IT SERVICE OUTSOURCING" " "PARTNER SERVICE 
SUBCONTRACTING"

We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC 
SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White 
Label Model.

Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee 
payroll, Help partner to get profit more than 50% on each project.. ..We really 
mean it.

We are already working with platinum partner like NTT DATA, NEC Singapore, 
Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc.

Are u keen to understand VOTEC IT PARTNERSHIP offerings? Looping KB 
kail...@votecgroup.com | Partnership In charge |

Let us know your availability this week OR Next week??

-Original Message-
From: 양형욱 [mailto:hyungwooky...@navercorp.com] 
Sent: 31 July 2023 14:42
To: users@kafka.apache.org
Subject: Kafka Connect Rest Extension Question

https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error
 

There is an issue with this link where the ConnectRestExtension implementation 
is not registered. I've done everything the kip official documentation says, 
but can you tell me why it doesn't work?

양형욱 Yang Hyung Wook
Global Platform Dev

경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529 Tel Mobile 010-2815-2145 Email 
hyungwooky...@navercorp.com



위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 
인하여 공개가 금지된 정보가 들어 있을 수 있습니다.
귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, 
배포, 공개해서는 안됩니다.
그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 
주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다.
​
This email and the information contained in this email are intended solely for 
the recipient(s) addressed above and may contain information that is 
confidential and/or privileged or whose disclosure is prohibited by law or 
other reasons.
If you are not the intended recipient of this email, please be advised that any 
unauthorized storage, duplication, dissemination, distribution or disclosure of 
all or part of this email is strictly prohibited.
If you received this email in error, please immediately contact NAVER Cloud 
Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies 
and attachments from your system. Thank you for your cooperation.


RE: Kafka Connect Rest Extension Question

2023-08-01 Thread miltan
Hi Team,

Greetings,

We actually reached out to you for Oracle/ IT / SAP / Infor / Microsoft "VOTEC 
IT SERVICE PARTNERSHIP"  "IT SERVICE OUTSOURCING" " "PARTNER SERVICE 
SUBCONTRACTING"

We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC 
SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White 
Label Model.

Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee 
payroll, Help partner to get profit more than 50% on each project.. ..We really 
mean it.

We are already working with platinum partner like NTT DATA, NEC Singapore, 
Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc.

Are u keen to understand VOTEC IT PARTNERSHIP offerings? Looping KB 
kail...@votecgroup.com | Partnership In charge |

Let us know your availability this week OR Next week??

-Original Message-
From: 양형욱 [mailto:hyungwooky...@navercorp.com] 
Sent: 31 July 2023 14:42
To: users@kafka.apache.org
Subject: Kafka Connect Rest Extension Question

https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error
 

There is an issue with this link where the ConnectRestExtension implementation 
is not registered. I've done everything the kip official documentation says, 
but can you tell me why it doesn't work?

양형욱 Yang Hyung Wook
Global Platform Dev

경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529 Tel Mobile 010-2815-2145 Email 
hyungwooky...@navercorp.com



위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 
인하여 공개가 금지된 정보가 들어 있을 수 있습니다.
귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, 
배포, 공개해서는 안됩니다.
그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 
주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다.
​
This email and the information contained in this email are intended solely for 
the recipient(s) addressed above and may contain information that is 
confidential and/or privileged or whose disclosure is prohibited by law or 
other reasons.
If you are not the intended recipient of this email, please be advised that any 
unauthorized storage, duplication, dissemination, distribution or disclosure of 
all or part of this email is strictly prohibited.
If you received this email in error, please immediately contact NAVER Cloud 
Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies 
and attachments from your system. Thank you for your cooperation.


RE: Kafka Connect Rest Extension Question

2023-08-01 Thread miltan
Hi Team,

Greetings,

We actually reached out to you for Oracle/ IT / SAP / Infor / Microsoft "VOTEC 
IT SERVICE PARTNERSHIP"  "IT SERVICE OUTSOURCING" " "PARTNER SERVICE 
SUBCONTRACTING"

We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC 
SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White 
Label Model.

Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee 
payroll, Help partner to get profit more than 50% on each project.. ..We really 
mean it.

We are already working with platinum partner like NTT DATA, NEC Singapore, 
Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc.

Are u keen to understand VOTEC IT PARTNERSHIP offerings? Looping KB 
kail...@votecgroup.com | Partnership In charge |

Let us know your availability this week OR Next week??

-Original Message-
From: Greg Harris [mailto:greg.har...@aiven.io.INVALID] 
Sent: 31 July 2023 23:42
To: users@kafka.apache.org
Subject: Re: Kafka Connect Rest Extension Question

Hello Yang Hyung Wook,

In your post I do not see anything obviously wrong, so you may need to do some 
more debugging.

1. Are you using the same jar for both the classpath and plugin.path tests? If 
not, do they both contain the service loader manifest file?
You can test this with
https://docs.oracle.com/javase/tutorial/deployment/jar/view.html
2. Do you see either of these errors
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L267-L269
for filesystem-specific problems?
3. This log line
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L275
should be printed multiple times when you're using plugin.path. Are you seeing 
a log including a directory containing your jar file, the jar file itself, or 
only other locations?
4. If there is a dependency-related error, it will appear with this error log: 
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L431

If the above doesn't help, can you please provide your redacted worker startup 
logs after
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java#L120
and before the worker starts printing configurations?

Thanks,
Greg Harris

On Mon, Jul 31, 2023 at 5:38 AM 양형욱  wrote:
>
> https://stackoverflow.com/questions/76797743/how-can-i-solve-connectre
> stextension-error
>
> There is an issue with this link where the ConnectRestExtension 
> implementation is not registered. I've done everything the kip official 
> documentation says, but can you tell me why it doesn't work?
>
> 양형욱 Yang Hyung Wook
> Global Platform Dev
>
> 경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529 Tel Mobile 010-2815-2145 
> Email hyungwooky...@navercorp.com
>
>
>
> 위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 
> 인하여 공개가 금지된 정보가 들어 있을 수 있습니다.
> 귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, 
> 배포, 공개해서는 안됩니다.
> 그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 
> 주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다.
>
> This email and the information contained in this email are intended solely 
> for the recipient(s) addressed above and may contain information that is 
> confidential and/or privileged or whose disclosure is prohibited by law or 
> other reasons.
> If you are not the intended recipient of this email, please be advised that 
> any unauthorized storage, duplication, dissemination, distribution or 
> disclosure of all or part of this email is strictly prohibited.
> If you received this email in error, please immediately contact NAVER Cloud 
> Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies 
> and attachments from your system. Thank you for your cooperation.



Re: Kafka Connect Rest Extension Question

2023-07-31 Thread Greg Harris
Hello Yang Hyung Wook,

In your post I do not see anything obviously wrong, so you may need to
do some more debugging.

1. Are you using the same jar for both the classpath and plugin.path
tests? If not, do they both contain the service loader manifest file?
You can test this with
https://docs.oracle.com/javase/tutorial/deployment/jar/view.html
2. Do you see either of these errors
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L267-L269
for filesystem-specific problems?
3. This log line
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L275
should be printed multiple times when you're using plugin.path. Are
you seeing a log including a directory containing your jar file, the
jar file itself, or only other locations?
4. If there is a dependency-related error, it will appear with this
error log: 
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L431

If the above doesn't help, can you please provide your redacted worker
startup logs after
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java#L120
and before the worker starts printing configurations?

Thanks,
Greg Harris

On Mon, Jul 31, 2023 at 5:38 AM 양형욱  wrote:
>
> https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error
>
> There is an issue with this link where the ConnectRestExtension 
> implementation is not registered. I've done everything the kip official 
> documentation says, but can you tell me why it doesn't work?
>
> 양형욱 Yang Hyung Wook
> Global Platform Dev
>
> 경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529
> Tel Mobile 010-2815-2145
> Email hyungwooky...@navercorp.com
>
>
>
> 위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 
> 인하여 공개가 금지된 정보가 들어 있을 수 있습니다.
> 귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, 
> 배포, 공개해서는 안됩니다.
> 그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 
> 주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다.
>
> This email and the information contained in this email are intended solely 
> for the recipient(s) addressed above and may contain information that is 
> confidential and/or privileged or whose disclosure is prohibited by law or 
> other reasons.
> If you are not the intended recipient of this email, please be advised that 
> any unauthorized storage, duplication, dissemination, distribution or 
> disclosure of all or part of this email is strictly prohibited.
> If you received this email in error, please immediately contact NAVER Cloud 
> Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies 
> and attachments from your system. Thank you for your cooperation.


Kafka Connect Rest Extension Question

2023-07-31 Thread 양형욱
https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error
 

There is an issue with this link where the ConnectRestExtension implementation 
is not registered. I've done everything the kip official documentation says, 
but can you tell me why it doesn't work?

양형욱 Yang Hyung Wook
Global Platform Dev

경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 (우)13529
Tel Mobile 010-2815-2145
Email hyungwooky...@navercorp.com



위 전자우편 및 그에 포함된 정보는 위에 기재된 수신인만을 위해 발송되는 것으로서 보안을 유지해야 하는 정보 및 법률상 또는 다른 사유로 
인하여 공개가 금지된 정보가 들어 있을 수 있습니다.
귀하가 이 전자우편의 지정 수신인이 아니면 이를 무단으로 보유, 복제, 전송, 배포, 공개할 수 없으며, 일부의 내용이라도 보유, 복제, 
배포, 공개해서는 안됩니다.
그러므로, 잘못 수신된 경우에는 즉시 네이버 클라우드 개인정보보호(dl_ncloud_priv...@navercorp.com)로 연락하여 
주시고, 원본 및 사본과 그에 따른 첨부 문서를 모두 삭제하여 주시기 바랍니다. 협조하여 주셔서 감사합니다.
​
This email and the information contained in this email are intended solely for 
the recipient(s) addressed above and may contain information that is 
confidential and/or privileged or whose disclosure is prohibited by law or 
other reasons.
If you are not the intended recipient of this email, please be advised that any 
unauthorized storage, duplication, dissemination, distribution or disclosure of 
all or part of this email is strictly prohibited.
If you received this email in error, please immediately contact NAVER Cloud 
Privacy(dl_ncloud_priv...@navercorp.com) and delete this email and any copies 
and attachments from your system. Thank you for your cooperation.


Re: Kafka Connect exactly-once semantic and very large transactions

2023-06-09 Thread Vojtech Juranek
Hi Chris,
thanks for your response!

Yes, we are looking also on other means how to enable exactly-once semantics 
for existing data (e.g. using incremental snapshot which snapshots the data 
incrementally and in smaller chunks), but first we would like to fully 
understand all the implications and consequences for Kafka, so if someone want 
to provide more insights or recommendations not mentioned in your response, it 
would be still very welcome.

Thanks!
Vojta

On Thursday, 8 June 2023 15:58:37 CEST Chris Egerton wrote:
> Hi Vojta,
> 
> From my limited understanding of the Debezium snapshot process, I believe
> that you're correct that producing the entire snapshot in a transaction is
> the way to provide exactly-once semantics during that phase. If there's a
> way to recover in-progress snapshots and skip over already-produced
> records, then that could be a suitable alternative.
> 
> You're correct that a large transaction timeout may be required to
> accommodate this case (we even try to call this out in the error message
> that users see on transaction timeouts [1]). I'm not very familiar with
> broker logic but with my limited understanding, your assessment of the
> impact of delayed log compaction also seems valid.
> 
> The only other issue that comes to my mind is that latency will be higher
> for downstream consumers since they won't be able to read any records until
> the entire transaction is complete, assuming they're using the
> read_committed isolation level. But given that this is the snapshotting
> phase and you're presumably moving historical data instead of real-time
> updates to your database, this should hopefully be acceptable for most
> users.
> 
> I'd be interested to hear what someone more familiar with client and broker
> internals has to say! Going to be following this thread.
> 
> [1] -
> https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba0
> 9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnce
> WorkerSourceTask.java#L357
> 
> Cheers,
> 
> Chris
> 
> On Thu, Jun 8, 2023 at 7:53 AM Vojtech Juranek  wrote:
> > Hi,
> > I'm investigating possibilities of exactly-once semantic for Debezium [1]
> > Kafka Connect source connectors, which implements change data capture for
> > various databases. Debezium has two phases, initial snapshot phase and
> > streaming phase. Initial snapshot phase loads existing data from the
> > database
> > and send it to the Kafka, subsequent streaming phase captures any changes
> > to
> > the data.
> > 
> > Exactly-once delivery seems to work really well during the streaming
> > phase.
> > Now, I'm investigating how to ensure exactly-once delivery for initial
> > snapshot phase.  If the snapshot fails (e.g. due to DB connection drop or
> > worker node crash), we force new snapshot after the restart as the data
> > may
> > change during the restart and the snapshot has to reflect the state of the
> > data
> > in time when it was executed. However, re-taking the snapshot produces
> > duplicate records in the Kafka related topics.
> > 
> > Probably the most easy solution to this issue is to run the whole snapshot
> > in
> > a single Kafka transaction. This may result into a huge transaction,
> > containing millions of records, in some cases even billions of records. As
> > these records cannot be consumed until transaction is committed and
> > therefore
> > logs cannot be compacted, this would potentially result in huge increase
> > of
> > Kafka logs. Also, as for the large DBs this is time consuming process, it
> > would very likely result in transaction timeouts (unless the timeout is
> > set to
> > very large value).
> > 
> > Is my understanding of the impact of very large transactions correct? Are
> > there any other drawbacks I'm missing (e.g. can it also result in some
> > memory
> > issue or something similar)?
> > 
> > Thanks in advanced!
> > Vojta
> > 
> > [1] https://debezium.io/



signature.asc
Description: This is a digitally signed message part.


Re: Kafka Connect exactly-once semantic and very large transactions

2023-06-08 Thread Chris Egerton
Hi Vojta,

>From my limited understanding of the Debezium snapshot process, I believe
that you're correct that producing the entire snapshot in a transaction is
the way to provide exactly-once semantics during that phase. If there's a
way to recover in-progress snapshots and skip over already-produced
records, then that could be a suitable alternative.

You're correct that a large transaction timeout may be required to
accommodate this case (we even try to call this out in the error message
that users see on transaction timeouts [1]). I'm not very familiar with
broker logic but with my limited understanding, your assessment of the
impact of delayed log compaction also seems valid.

The only other issue that comes to my mind is that latency will be higher
for downstream consumers since they won't be able to read any records until
the entire transaction is complete, assuming they're using the
read_committed isolation level. But given that this is the snapshotting
phase and you're presumably moving historical data instead of real-time
updates to your database, this should hopefully be acceptable for most
users.

I'd be interested to hear what someone more familiar with client and broker
internals has to say! Going to be following this thread.

[1] -
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L357

Cheers,

Chris

On Thu, Jun 8, 2023 at 7:53 AM Vojtech Juranek  wrote:

> Hi,
> I'm investigating possibilities of exactly-once semantic for Debezium [1]
> Kafka Connect source connectors, which implements change data capture for
> various databases. Debezium has two phases, initial snapshot phase and
> streaming phase. Initial snapshot phase loads existing data from the
> database
> and send it to the Kafka, subsequent streaming phase captures any changes
> to
> the data.
>
> Exactly-once delivery seems to work really well during the streaming
> phase.
> Now, I'm investigating how to ensure exactly-once delivery for initial
> snapshot phase.  If the snapshot fails (e.g. due to DB connection drop or
> worker node crash), we force new snapshot after the restart as the data
> may
> change during the restart and the snapshot has to reflect the state of the
> data
> in time when it was executed. However, re-taking the snapshot produces
> duplicate records in the Kafka related topics.
>
> Probably the most easy solution to this issue is to run the whole snapshot
> in
> a single Kafka transaction. This may result into a huge transaction,
> containing millions of records, in some cases even billions of records. As
> these records cannot be consumed until transaction is committed and
> therefore
> logs cannot be compacted, this would potentially result in huge increase
> of
> Kafka logs. Also, as for the large DBs this is time consuming process, it
> would very likely result in transaction timeouts (unless the timeout is
> set to
> very large value).
>
> Is my understanding of the impact of very large transactions correct? Are
> there any other drawbacks I'm missing (e.g. can it also result in some
> memory
> issue or something similar)?
>
> Thanks in advanced!
> Vojta
>
> [1] https://debezium.io/


Kafka Connect exactly-once semantic and very large transactions

2023-06-08 Thread Vojtech Juranek
Hi,
I'm investigating possibilities of exactly-once semantic for Debezium [1] 
Kafka Connect source connectors, which implements change data capture for 
various databases. Debezium has two phases, initial snapshot phase and 
streaming phase. Initial snapshot phase loads existing data from the database 
and send it to the Kafka, subsequent streaming phase captures any changes to 
the data.

Exactly-once delivery seems to work really well during the streaming phase. 
Now, I'm investigating how to ensure exactly-once delivery for initial 
snapshot phase.  If the snapshot fails (e.g. due to DB connection drop or 
worker node crash), we force new snapshot after the restart as the data may 
change during the restart and the snapshot has to reflect the state of the data 
in time when it was executed. However, re-taking the snapshot produces 
duplicate records in the Kafka related topics. 

Probably the most easy solution to this issue is to run the whole snapshot in 
a single Kafka transaction. This may result into a huge transaction, 
containing millions of records, in some cases even billions of records. As 
these records cannot be consumed until transaction is committed and therefore 
logs cannot be compacted, this would potentially result in huge increase of 
Kafka logs. Also, as for the large DBs this is time consuming process, it 
would very likely result in transaction timeouts (unless the timeout is set to 
very large value).

Is my understanding of the impact of very large transactions correct? Are 
there any other drawbacks I'm missing (e.g. can it also result in some memory 
issue or something similar)?

Thanks in advanced!
Vojta

[1] https://debezium.io/

signature.asc
Description: This is a digitally signed message part.


Kafka connect process listens to an unknown port

2023-05-24 Thread Jorge Martin Cristobal
Hi all,

I'm testing apache kafka connect for a project and I found that the main
process listens to two different ports, the one to provide REST api, 8083
by default, and a different unprivileged port that changes its number each
restart. For instance, this is fragment of the output from netstat command:
tcp 0 0 0.0.0.0:8083 0.0.0.0:* LISTEN 28646/java
tcp 0 0 0.0.0.0:42859 0.0.0.0:* LISTEN 28646/java <--THIS

What's the purpose of that port?. Is there any public definition of that
interface? I haven't found any documentation wrt that port number.
Kindly regards,

Jorge M.


Re: Kafka connect process listens to an unknown port

2023-05-19 Thread Greg Harris
Hey Jorge,

I looked into it, and can reproduce the second LISTEN port in a
vanilla Kafka Connect cluster without any connectors running.

Using jstack, I see that there are two threads that appear to be
waiting in the corresponding accept methods:

"RMI TCP Accept-0" #15 daemon prio=5 os_prio=31 cpu=0.37ms
elapsed=790.61s tid=0x0001370f5a00 nid=0x6d03 runnable
[0x00017255e000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.accept(java.base@17.0.4.1/Native Method)

"qtp1530870688-33-acceptor-0@45f75f75-http_8083@43826ec{HTTP/1.1,
(http/1.1)}{0.0.0.0:8083}" #33 prio=3 os_prio=31 cpu=0.17ms
elapsed=789.45s tid=0x00012712e400 nid=0x8707 runnable
[0x0001737ca000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.accept(java.base@17.0.4.1/Native Method)

The latter appears to be the 8083 port which is serving the REST API,
and is expected.
The former appears to be a Java Remote Method Invocation socket on
"port 0" which ends up selecting a random open port on each startup.
I do see that RMI is often used to export JMX metrics, and it appears
that kafka-run-class.sh (which is used when starting Kafka Connect) by
default enables JMX metrics.
I was able to disable the RMI port by setting the
KAFKA_JMX_OPTS="-Dkey=value" environment variable before running
bin/connect-distributed.sh. This isn't a recommendation for you, but
proves that the default KAFKA_JMX_OPTS settings from the
kafka-run-class.sh are relevant.

Thanks for the question, I learned something new!
Greg Harris

On Fri, May 19, 2023 at 4:45 AM Jorge Martin Cristobal
 wrote:
>
> Hi all,
>
> I'm testing apache kafka connect for a project and I found that the main
> process listens to two different ports, the one to provide REST api, 8083
> by default, and a different unprivileged port that changes its number each
> restart. For instance, this is fragment of the output from netstat command:
> tcp 0 0 0.0.0.0:8083 0.0.0.0:* LISTEN 28646/java
> tcp 0 0 0.0.0.0:42859 0.0.0.0:* LISTEN 28646/java <--THIS
>
> What's the purpose of that port?. Is there any public definition of that
> interface? I haven't found any documentation wrt that port number.
> Kindly regards,
>
> Jorge M.


Kafka connect process listens to an unknown port

2023-05-19 Thread Jorge Martin Cristobal
Hi all,

I'm testing apache kafka connect for a project and I found that the main
process listens to two different ports, the one to provide REST api, 8083
by default, and a different unprivileged port that changes its number each
restart. For instance, this is fragment of the output from netstat command:
tcp 0 0 0.0.0.0:8083 0.0.0.0:* LISTEN 28646/java
tcp 0 0 0.0.0.0:42859 0.0.0.0:* LISTEN 28646/java <--THIS

What's the purpose of that port?. Is there any public definition of that
interface? I haven't found any documentation wrt that port number.
Kindly regards,

Jorge M.


Kafka Connect Startup Hook

2023-03-20 Thread Jan Baudisch (extern)
Hello,

can someone please give me a hint how to execute two lines of code upon Kafka 
Connect Startup, like:

final JaegerTracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.register(tracer);

I implemented using a custom (Fake-)Connector, but there is much overhead, 
because you also need a Task, Config etc.

Is there some simpler way, some kind of hook?

Thanks in advance,
Jan



Re: AW: Kafka Connect Startup Hook

2023-03-20 Thread Hutson, Paul
unsubscribe

On 20/03/2023, 10:30, "Jan Baudisch (extern)" mailto:jan.baudisch.ext...@bdess.com>> wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.






Hello Jakub,


thank you for you quick answer. We solved it by implementing a ConfigProvider, 
like described here:


https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configprovider-interface
 
<https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configprovider-interface>


public class TracingConfigProvider implements ConfigProvider {


@Override
public void configure(Map map) {
final JaegerTracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.registerIfAbsent(tracer);
}




@Override
public ConfigData get(String s) {
return null;
}


@Override
public ConfigData get(String s, Set set) {
return null;
}


@Override
public void close(){}


}




And setting these Environment Variables in Kafka Connect


- CONNECT_CONFIG_PROVIDERS=tracing
- CONNECT_CONFIG_PROVIDERS_TRACING_CLASS=org.example.TracingConfigProvider


Best regards,
Jan






Von: Jakub Scholz mailto:ja...@scholz.cz>>
Datum: Montag, 20. März 2023 um 10:23
An: users@kafka.apache.org <mailto:users@kafka.apache.org> 
mailto:users@kafka.apache.org>>
Betreff: Re: Kafka Connect Startup Hook
In Strimzi, we use a Java agent to register the tracer (
https://github.com/strimzi/strimzi-kafka-operator/tree/main/tracing-agent/ 
<https://github.com/strimzi/strimzi-kafka-operator/tree/main/tracing-agent/>
if you wanna check the source code).


Jakub


On Mon, Mar 20, 2023 at 9:18 AM Jan Baudisch (extern) <
jan.baudisch.ext...@bdess.com <mailto:jan.baudisch.ext...@bdess.com>> wrote:


> Hello,
>
> can someone please give me a hint how to execute two lines of code upon
> Kafka Connect Startup, like:
>
> final JaegerTracer tracer = Configuration.fromEnv().getTracer();
> GlobalTracer.register(tracer);
>
> I implemented using a custom (Fake-)Connector, but there is much overhead,
> because you also need a Task, Config etc.
>
> Is there some simpler way, some kind of hook?
>
> Thanks in advance,
> Jan
>
>
>





AW: Kafka Connect Startup Hook

2023-03-20 Thread Jan Baudisch (extern)
Hello Jakub,

thank you for you quick answer. We solved it by implementing a ConfigProvider, 
like described here:

https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configprovider-interface

public class TracingConfigProvider implements ConfigProvider {

@Override
public void configure(Map map) {
final JaegerTracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.registerIfAbsent(tracer);
}


@Override
public ConfigData get(String s) {
return null;
}

@Override
public ConfigData get(String s, Set set) {
return null;
}

@Override
public void close(){}

}


And setting these Environment Variables in Kafka Connect

- CONNECT_CONFIG_PROVIDERS=tracing
- CONNECT_CONFIG_PROVIDERS_TRACING_CLASS=org.example.TracingConfigProvider

Best regards,
Jan



Von: Jakub Scholz 
Datum: Montag, 20. März 2023 um 10:23
An: users@kafka.apache.org 
Betreff: Re: Kafka Connect Startup Hook
In Strimzi, we use a Java agent to register the tracer (
https://github.com/strimzi/strimzi-kafka-operator/tree/main/tracing-agent/
if you wanna check the source code).

Jakub

On Mon, Mar 20, 2023 at 9:18 AM Jan Baudisch (extern) <
jan.baudisch.ext...@bdess.com> wrote:

> Hello,
>
> can someone please give me a hint how to execute two lines of code upon
> Kafka Connect Startup, like:
>
> final JaegerTracer tracer = Configuration.fromEnv().getTracer();
> GlobalTracer.register(tracer);
>
> I implemented using a custom (Fake-)Connector, but there is much overhead,
> because you also need a Task, Config etc.
>
> Is there some simpler way, some kind of hook?
>
> Thanks in advance,
> Jan
>
>
>


Re: Kafka Connect Startup Hook

2023-03-20 Thread Jakub Scholz
In Strimzi, we use a Java agent to register the tracer (
https://github.com/strimzi/strimzi-kafka-operator/tree/main/tracing-agent/
if you wanna check the source code).

Jakub

On Mon, Mar 20, 2023 at 9:18 AM Jan Baudisch (extern) <
jan.baudisch.ext...@bdess.com> wrote:

> Hello,
>
> can someone please give me a hint how to execute two lines of code upon
> Kafka Connect Startup, like:
>
> final JaegerTracer tracer = Configuration.fromEnv().getTracer();
> GlobalTracer.register(tracer);
>
> I implemented using a custom (Fake-)Connector, but there is much overhead,
> because you also need a Task, Config etc.
>
> Is there some simpler way, some kind of hook?
>
> Thanks in advance,
> Jan
>
>
>


Kafka Connect Startup Hook

2023-03-20 Thread Jan Baudisch (extern)
Hello,

can someone please give me a hint how to execute two lines of code upon Kafka 
Connect Startup, like:

final JaegerTracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.register(tracer);

I implemented using a custom (Fake-)Connector, but there is much overhead, 
because you also need a Task, Config etc.

Is there some simpler way, some kind of hook?

Thanks in advance,
Jan




Re: Exactly once kafka connect query

2023-03-16 Thread Chris Egerton
Hi Nitty,

I understand your concerns about preserving intellectual property. Perhaps
to avoid these altogether, instead of a call, you can provide a
reproduction of your issues that is acceptable to share with the public? If
I'm able to successfully diagnose the problem, I can share a summary on the
mailing list, and if things are still unclear, I can join a call to discuss
the publicly-visible example and what seems to be the issue.

Cheers,

Chris

On Thu, Mar 16, 2023 at 5:54 AM NITTY BENNY  wrote:

> Hi Xiaoxia,
>
> I am not able to see the attachments you shared with me. I don't understand
> the problem you are talking about.
> What do you want me to look?
>
> Thanks,
> Nitty
>
> On Thu, Mar 16, 2023 at 1:54 AM 小侠 <747359...@qq.com.invalid> wrote:
>
> > Hi Nitty,
> > I'm so sorry to forget the signature.
> > Looking forward to your reply.
> >
> >
> > Thank you,
> > Xiaoxia
> >
> >
> >
> >
> > -- 原始邮件 ------
> > *发件人:* "users" ;
> > *发送时间:* 2023年3月15日(星期三) 晚上6:38
> > *收件人:* "users";
> > *主题:* Re: Exactly once kafka connect query
> >
> > Hi Chris,
> >
> > We won't be abe to share the source code since it is the properetry
> Amdocs
> > code.
> >
> > If you have time for a call, I can show you the code and
> > reproduction scenario over the call. I strongly believe you can find the
> > issue with that.
> >
> > Thanks,
> > Nitty
> >
> > Thanks,
> > Nitty
> >
> > On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton 
> > wrote:
> >
> > > Hi Nitty,
> > >
> > > Sorry, I should have clarified. The reason I'm thinking about shutdown
> > here
> > > is that, when exactly-once support is enabled on a Kafka Connect
> cluster
> > > and a new set of task configurations is generated for a connector, the
> > > Connect framework makes an effort to shut down all the old task
> instances
> > > for that connector, and then fences out the transactional producers for
> > all
> > > of those instances. I was thinking that this may lead to the producer
> > > exceptions you are seeing but, after double-checking this assumption,
> > that
> > > does not appear to be the case.
> > >
> > > Would it be possible to share the source code for your connector and a
> > > reproduction scenario for what you're seeing? That may be easier than
> > > coordinating a call.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY 
> > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > Is there any possibility to have a call with you? This is actually
> > > blocking
> > > > our delivery, I actually want to sort with this.
> > > >
> > > > Thanks,
> > > > Nitty
> > > >
> > > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY 
> > > wrote:
> > > >
> > > > > Hi Chris,
> > > > >
> > > > > I really don't understand why a graceful shutdown will happen
> during
> > a
> > > > > commit operation? Am I understanding something wrong here?. I see
> > > > > this happens when I have a batch of 2 valid records and in the
> second
> > > > > batch the record is invalid. In that case I want to commit the
> valid
> > > > > records. So I called commit and sent an empty list for the current
> > > batch
> > > > to
> > > > > poll() and then when the next file comes in and poll sees new
> > records,
> > > I
> > > > > see InvalidProducerEpochException.
> > > > > Please advise me.
> > > > >
> > > > > Thanks,
> > > > > Nitty
> > > > >
> > > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY 
> > > > wrote:
> > > > >
> > > > >> Hi Chris,
> > > > >>
> > > > >> The difference is in the Task Classes, no difference for value/key
> > > > >> convertors.
> > > > >>
> > > > >> I don’t see log messages for graceful shutdown. I am not clear on
> > what
> > > > >> you mean by shutting down the task.
> > > > >>
> > > > >> I called the commit operation for the successful records. Should I
> > > > >> perf

Re: Exactly once kafka connect query

2023-03-16 Thread NITTY BENNY
Hi Xiaoxia,

I am not able to see the attachments you shared with me. I don't understand
the problem you are talking about.
What do you want me to look?

Thanks,
Nitty

On Thu, Mar 16, 2023 at 1:54 AM 小侠 <747359...@qq.com.invalid> wrote:

> Hi Nitty,
> I'm so sorry to forget the signature.
> Looking forward to your reply.
>
>
> Thank you,
> Xiaoxia
>
>
>
>
> -- 原始邮件 --
> *发件人:* "users" ;
> *发送时间:* 2023年3月15日(星期三) 晚上6:38
> *收件人:* "users";
> *主题:* Re: Exactly once kafka connect query
>
> Hi Chris,
>
> We won't be abe to share the source code since it is the properetry Amdocs
> code.
>
> If you have time for a call, I can show you the code and
> reproduction scenario over the call. I strongly believe you can find the
> issue with that.
>
> Thanks,
> Nitty
>
> Thanks,
> Nitty
>
> On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton 
> wrote:
>
> > Hi Nitty,
> >
> > Sorry, I should have clarified. The reason I'm thinking about shutdown
> here
> > is that, when exactly-once support is enabled on a Kafka Connect cluster
> > and a new set of task configurations is generated for a connector, the
> > Connect framework makes an effort to shut down all the old task instances
> > for that connector, and then fences out the transactional producers for
> all
> > of those instances. I was thinking that this may lead to the producer
> > exceptions you are seeing but, after double-checking this assumption,
> that
> > does not appear to be the case.
> >
> > Would it be possible to share the source code for your connector and a
> > reproduction scenario for what you're seeing? That may be easier than
> > coordinating a call.
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY 
> wrote:
> >
> > > Hi Chris,
> > >
> > > Is there any possibility to have a call with you? This is actually
> > blocking
> > > our delivery, I actually want to sort with this.
> > >
> > > Thanks,
> > > Nitty
> > >
> > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY 
> > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > I really don't understand why a graceful shutdown will happen during
> a
> > > > commit operation? Am I understanding something wrong here?. I see
> > > > this happens when I have a batch of 2 valid records and in the second
> > > > batch the record is invalid. In that case I want to commit the valid
> > > > records. So I called commit and sent an empty list for the current
> > batch
> > > to
> > > > poll() and then when the next file comes in and poll sees new
> records,
> > I
> > > > see InvalidProducerEpochException.
> > > > Please advise me.
> > > >
> > > > Thanks,
> > > > Nitty
> > > >
> > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY 
> > > wrote:
> > > >
> > > >> Hi Chris,
> > > >>
> > > >> The difference is in the Task Classes, no difference for value/key
> > > >> convertors.
> > > >>
> > > >> I don’t see log messages for graceful shutdown. I am not clear on
> what
> > > >> you mean by shutting down the task.
> > > >>
> > > >> I called the commit operation for the successful records. Should I
> > > >> perform any other steps if I have an invalid record?
> > > >> Please advise.
> > > >>
> > > >> Thanks,
> > > >> Nitty
> > > >>
> > > >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton
>  > >
> > > >> wrote:
> > > >>
> > > >>> Hi Nitty,
> > > >>>
> > > >>> Thanks again for all the details here, especially the log messages.
> > > >>>
> > > >>> > The below mentioned issue is happening for Json connector only.
> Is
> > > >>> there
> > > >>> any difference with asn1,binary,csv and json connector?
> > > >>>
> > > >>> Can you clarify if the difference here is in the Connector/Task
> > > classens,
> > > >>> or if it's in the key/value converters that are configured for the
> > > >>> connector? The key/value converters are configured using the
> > > >>> &

Re: Exactly once kafka connect query

2023-03-15 Thread NITTY BENNY
Hi Chris,

We won't be abe to share the source code since it is the properetry Amdocs
code.

If you have time for a call, I can show you the code and
reproduction scenario over the call. I strongly believe you can find the
issue with that.

Thanks,
Nitty

Thanks,
Nitty

On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton 
wrote:

> Hi Nitty,
>
> Sorry, I should have clarified. The reason I'm thinking about shutdown here
> is that, when exactly-once support is enabled on a Kafka Connect cluster
> and a new set of task configurations is generated for a connector, the
> Connect framework makes an effort to shut down all the old task instances
> for that connector, and then fences out the transactional producers for all
> of those instances. I was thinking that this may lead to the producer
> exceptions you are seeing but, after double-checking this assumption, that
> does not appear to be the case.
>
> Would it be possible to share the source code for your connector and a
> reproduction scenario for what you're seeing? That may be easier than
> coordinating a call.
>
> Cheers,
>
> Chris
>
> On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY  wrote:
>
> > Hi Chris,
> >
> > Is there any possibility to have a call with you? This is actually
> blocking
> > our delivery, I actually want to sort with this.
> >
> > Thanks,
> > Nitty
> >
> > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY 
> wrote:
> >
> > > Hi Chris,
> > >
> > > I really don't understand why a graceful shutdown will happen during a
> > > commit operation? Am I understanding something wrong here?. I see
> > > this happens when I have a batch of 2 valid records and in the second
> > > batch the record is invalid. In that case I want to commit the valid
> > > records. So I called commit and sent an empty list for the current
> batch
> > to
> > > poll() and then when the next file comes in and poll sees new records,
> I
> > > see InvalidProducerEpochException.
> > > Please advise me.
> > >
> > > Thanks,
> > > Nitty
> > >
> > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY 
> > wrote:
> > >
> > >> Hi Chris,
> > >>
> > >> The difference is in the Task Classes, no difference for value/key
> > >> convertors.
> > >>
> > >> I don’t see log messages for graceful shutdown. I am not clear on what
> > >> you mean by shutting down the task.
> > >>
> > >> I called the commit operation for the successful records. Should I
> > >> perform any other steps if I have an invalid record?
> > >> Please advise.
> > >>
> > >> Thanks,
> > >> Nitty
> > >>
> > >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton  >
> > >> wrote:
> > >>
> > >>> Hi Nitty,
> > >>>
> > >>> Thanks again for all the details here, especially the log messages.
> > >>>
> > >>> > The below mentioned issue is happening for Json connector only. Is
> > >>> there
> > >>> any difference with asn1,binary,csv and json connector?
> > >>>
> > >>> Can you clarify if the difference here is in the Connector/Task
> > classens,
> > >>> or if it's in the key/value converters that are configured for the
> > >>> connector? The key/value converters are configured using the
> > >>> "key.converter" and "value.converter" property and, if problems arise
> > >>> with
> > >>> them, the task will fail and, if it has a non-empty ongoing
> > transaction,
> > >>> that transaction will be automatically aborted since we close the
> > task's
> > >>> Kafka producer when it fails (or shuts down gracefully).
> > >>>
> > >>> With regards to these log messages:
> > >>>
> > >>> > org.apache.kafka.common.errors.ProducerFencedException: There is a
> > >>> newer
> > >>> producer with the same transactionalId which fences the current one.
> > >>>
> > >>> It looks like your tasks aren't shutting down gracefully in time,
> which
> > >>> causes them to be fenced out by the Connect framework later on. Do
> you
> > >>> see
> > >>> messages like "Graceful stop of task  failed" in the
> logs
> > >>> for
> > >>> your Connect worker?
> > >>>
> > >>> Cheers

Re: Exactly once kafka connect query

2023-03-14 Thread Chris Egerton
Hi Nitty,

Sorry, I should have clarified. The reason I'm thinking about shutdown here
is that, when exactly-once support is enabled on a Kafka Connect cluster
and a new set of task configurations is generated for a connector, the
Connect framework makes an effort to shut down all the old task instances
for that connector, and then fences out the transactional producers for all
of those instances. I was thinking that this may lead to the producer
exceptions you are seeing but, after double-checking this assumption, that
does not appear to be the case.

Would it be possible to share the source code for your connector and a
reproduction scenario for what you're seeing? That may be easier than
coordinating a call.

Cheers,

Chris

On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY  wrote:

> Hi Chris,
>
> Is there any possibility to have a call with you? This is actually blocking
> our delivery, I actually want to sort with this.
>
> Thanks,
> Nitty
>
> On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY  wrote:
>
> > Hi Chris,
> >
> > I really don't understand why a graceful shutdown will happen during a
> > commit operation? Am I understanding something wrong here?. I see
> > this happens when I have a batch of 2 valid records and in the second
> > batch the record is invalid. In that case I want to commit the valid
> > records. So I called commit and sent an empty list for the current batch
> to
> > poll() and then when the next file comes in and poll sees new records, I
> > see InvalidProducerEpochException.
> > Please advise me.
> >
> > Thanks,
> > Nitty
> >
> > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY 
> wrote:
> >
> >> Hi Chris,
> >>
> >> The difference is in the Task Classes, no difference for value/key
> >> convertors.
> >>
> >> I don’t see log messages for graceful shutdown. I am not clear on what
> >> you mean by shutting down the task.
> >>
> >> I called the commit operation for the successful records. Should I
> >> perform any other steps if I have an invalid record?
> >> Please advise.
> >>
> >> Thanks,
> >> Nitty
> >>
> >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton 
> >> wrote:
> >>
> >>> Hi Nitty,
> >>>
> >>> Thanks again for all the details here, especially the log messages.
> >>>
> >>> > The below mentioned issue is happening for Json connector only. Is
> >>> there
> >>> any difference with asn1,binary,csv and json connector?
> >>>
> >>> Can you clarify if the difference here is in the Connector/Task
> classens,
> >>> or if it's in the key/value converters that are configured for the
> >>> connector? The key/value converters are configured using the
> >>> "key.converter" and "value.converter" property and, if problems arise
> >>> with
> >>> them, the task will fail and, if it has a non-empty ongoing
> transaction,
> >>> that transaction will be automatically aborted since we close the
> task's
> >>> Kafka producer when it fails (or shuts down gracefully).
> >>>
> >>> With regards to these log messages:
> >>>
> >>> > org.apache.kafka.common.errors.ProducerFencedException: There is a
> >>> newer
> >>> producer with the same transactionalId which fences the current one.
> >>>
> >>> It looks like your tasks aren't shutting down gracefully in time, which
> >>> causes them to be fenced out by the Connect framework later on. Do you
> >>> see
> >>> messages like "Graceful stop of task  failed" in the logs
> >>> for
> >>> your Connect worker?
> >>>
> >>> Cheers,
> >>>
> >>> Chris
> >>>
> >>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY 
> >>> wrote:
> >>>
> >>> > Hi Chris,
> >>> >
> >>> > As you said, the below message is coming when I call an abort if
> there
> >>> is
> >>> > an invalid record, then for the next transaction I can see the below
> >>> > message and then the connector will be stopped.
> >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
> >>> Aborting
> >>> > transaction for batch as requested by connector
> >>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> >>> > [task-thread-json-sftp-source-connector-0]
> >>

Re: Exactly once kafka connect query

2023-03-14 Thread NITTY BENNY
producer batches due to fatal error
>>> > >> (org.apache.kafka.clients.producer.internals.Sender)
>>> > >> [kafka-producer-network-thread |
>>> > >> connector-producer-json-sftp-source-connector-0]
>>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> > >> producer with the same transactionalId which fences the current one.
>>> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
>>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed
>>> to
>>> > >> flush offsets to storage:
>>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>>> > >> [kafka-producer-network-thread |
>>> > >> connector-producer-json-sftp-source-connector-0]
>>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> > >> producer with the same transactionalId which fences the current one.
>>> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
>>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed
>>> to
>>> > send
>>> > >> record to streams-input:
>>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
>>> > >> [kafka-producer-network-thread |
>>> > >> connector-producer-json-sftp-source-connector-0]
>>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> > >> producer with the same transactionalId which fences the current one.
>>> > >> 2023-03-12 11:32:45,222 ERROR
>>> > [json-sftp-source-connector|task-0|offsets]
>>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed
>>> to
>>> > >> commit producer transaction
>>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>>> > >> [task-thread-json-sftp-source-connector-0]
>>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> > >> producer with the same transactionalId which fences the current one.
>>> > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
>>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task
>>> threw
>>> > an
>>> > >> uncaught and unrecoverable exception. Task is being killed and will
>>> not
>>> > >> recover until manually restarted
>>> > >> (org.apache.kafka.connect.runtime.WorkerTask)
>>> > >> [task-thread-json-sftp-source-connector-0]
>>> > >>
>>> > >> Do you know why it is showing an abort state even if I call commit?
>>> > >>
>>> > >> I tested one more scenario, When I call the commit I saw the below
>>> > >>
>>> >
>>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
>>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>>> > >> txnStartTimestamp=1678620463834,
>>> txnLastUpdateTimestamp=1678620463834)
>>> > >> Then, before changing the states to Abort, I dropped the next file
>>> then
>>> > I
>>> > >> dont see any issues. Previous transaction
>>> > >> as well as the current transaction are committed.
>>> > >>
>>> > >> Thank you for your support.
>>> > >>
>>> > >> Thanks,
>>> > >> Nitty
>>> > >>
>>> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton
>>> 
>>> > >> wrote:
>>> > >>
>>> > >>> Hi Nitty,
>>> > >>>
>>> > >>> > I called commitTransaction when I reach the first error record,
>>> but
>>> > >>> commit is not happening for me. Kafka connect tries to abort the
>>> > >>> transaction automatically
>>> > >>>
>>> > >>> This is really interesting--are you certain that your task never
>>> > invoked
>>> > >>> TransactionContext::abortTransaction in this case? I'm 

Re: Exactly once kafka connect query

2023-03-13 Thread NITTY BENNY
nalId which fences the current one.
>> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed
>> to
>> > send
>> > >> record to streams-input:
>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
>> > >> [kafka-producer-network-thread |
>> > >> connector-producer-json-sftp-source-connector-0]
>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>> newer
>> > >> producer with the same transactionalId which fences the current one.
>> > >> 2023-03-12 11:32:45,222 ERROR
>> > [json-sftp-source-connector|task-0|offsets]
>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed
>> to
>> > >> commit producer transaction
>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>> > >> [task-thread-json-sftp-source-connector-0]
>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>> newer
>> > >> producer with the same transactionalId which fences the current one.
>> > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task
>> threw
>> > an
>> > >> uncaught and unrecoverable exception. Task is being killed and will
>> not
>> > >> recover until manually restarted
>> > >> (org.apache.kafka.connect.runtime.WorkerTask)
>> > >> [task-thread-json-sftp-source-connector-0]
>> > >>
>> > >> Do you know why it is showing an abort state even if I call commit?
>> > >>
>> > >> I tested one more scenario, When I call the commit I saw the below
>> > >>
>> >
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>> > >> txnStartTimestamp=1678620463834,
>> txnLastUpdateTimestamp=1678620463834)
>> > >> Then, before changing the states to Abort, I dropped the next file
>> then
>> > I
>> > >> dont see any issues. Previous transaction
>> > >> as well as the current transaction are committed.
>> > >>
>> > >> Thank you for your support.
>> > >>
>> > >> Thanks,
>> > >> Nitty
>> > >>
>> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton
>> 
>> > >> wrote:
>> > >>
>> > >>> Hi Nitty,
>> > >>>
>> > >>> > I called commitTransaction when I reach the first error record,
>> but
>> > >>> commit is not happening for me. Kafka connect tries to abort the
>> > >>> transaction automatically
>> > >>>
>> > >>> This is really interesting--are you certain that your task never
>> > invoked
>> > >>> TransactionContext::abortTransaction in this case? I'm looking over
>> the
>> > >>> code base and it seems fairly clear that the only thing that could
>> > >>> trigger
>> > >>> a call to KafkaProducer::abortTransaction is a request by the task
>> to
>> > >>> abort
>> > >>> a transaction (either for a next batch, or for a specific record).
>> It
>> > may
>> > >>> help to run the connector in a debugger and/or look for "Aborting
>> > >>> transaction for batch as requested by connector" or "Aborting
>> > transaction
>> > >>> for record on topic  as requested by connector" log
>> > >>> messages (which will be emitted at INFO level by
>> > >>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask
>> class
>> > if
>> > >>> the task is requesting an abort).
>> > >>>
>> > >>> Regardless, I'll work on a fix for the bug with aborting empty
>> > >>> transactions. Thanks for helping uncover that one!
>> > >>>
>> > >>> Cheers,
>> > >>>
>> > >>> Chris
>> > >>>
>> > >>> On Thu, Mar 9

Re: Exactly once kafka connect query

2023-03-13 Thread NITTY BENNY
463834, txnLastUpdateTimestamp=1678620526119)
> > >>
> >
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> > >> producerId=11, producerEpoch=3, txnTimeoutMs=6,
> > state=*CompleteAbort*,
> > >> pendingState=None, topicPartitions=HashSet(),
> > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121)
> > >>
> > >> Later for the next transaction, when it returns the first batch below
> is
> > >> the logs I can see.
> > >>
> > >>  Transiting to abortable error state due to
> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> > >> attempted to produce with an old epoch.
> > >> (org.apache.kafka.clients.producer.internals.TransactionManager)
> > >> [kafka-producer-network-thread |
> > >> connector-producer-json-sftp-source-connector-0]
> > >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0]
> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to
> > send
> > >> record to streams-input:
> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> > >> [kafka-producer-network-thread |
> > >> connector-producer-json-sftp-source-connector-0]
> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> > >> attempted to produce with an old epoch.
> > >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0]
> > >> [Producer clientId=connector-producer-json-sftp-source-connector-0,
> > >> transactionalId=connect-cluster-json-sftp-source-connector-0]
> > Transiting to
> > >> fatal error state due to
> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
> newer
> > >> producer with the same transactionalId which fences the current one.
> > >> (org.apache.kafka.clients.producer.internals.TransactionManager)
> > >> [kafka-producer-network-thread |
> > >> connector-producer-json-sftp-source-connector-0]
> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
> > >> [Producer clientId=connector-producer-json-sftp-source-connector-0,
> > >> transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
> > >> producer batches due to fatal error
> > >> (org.apache.kafka.clients.producer.internals.Sender)
> > >> [kafka-producer-network-thread |
> > >> connector-producer-json-sftp-source-connector-0]
> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
> newer
> > >> producer with the same transactionalId which fences the current one.
> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
> > >> flush offsets to storage:
> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> > >> [kafka-producer-network-thread |
> > >> connector-producer-json-sftp-source-connector-0]
> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
> newer
> > >> producer with the same transactionalId which fences the current one.
> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to
> > send
> > >> record to streams-input:
> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> > >> [kafka-producer-network-thread |
> > >> connector-producer-json-sftp-source-connector-0]
> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
> newer
> > >> producer with the same transactionalId which fences the current one.
> > >> 2023-03-12 11:32:45,222 ERROR
> > [json-sftp-source-connector|task-0|offsets]
> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
> > >> commit producer transaction
> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> > >> [task-thread-json-sftp-source-connector-0]
> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
> newer
> > >> producer with the same transactionalId which fences the current one.
> > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connect

Re: Exactly once kafka connect query

2023-03-13 Thread Chris Egerton
-thread |
> >> connector-producer-json-sftp-source-connector-0]
> >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0]
> >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to
> send
> >> record to streams-input:
> >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> >> [kafka-producer-network-thread |
> >> connector-producer-json-sftp-source-connector-0]
> >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> >> attempted to produce with an old epoch.
> >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0]
> >> [Producer clientId=connector-producer-json-sftp-source-connector-0,
> >> transactionalId=connect-cluster-json-sftp-source-connector-0]
> Transiting to
> >> fatal error state due to
> >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> >> producer with the same transactionalId which fences the current one.
> >> (org.apache.kafka.clients.producer.internals.TransactionManager)
> >> [kafka-producer-network-thread |
> >> connector-producer-json-sftp-source-connector-0]
> >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
> >> [Producer clientId=connector-producer-json-sftp-source-connector-0,
> >> transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
> >> producer batches due to fatal error
> >> (org.apache.kafka.clients.producer.internals.Sender)
> >> [kafka-producer-network-thread |
> >> connector-producer-json-sftp-source-connector-0]
> >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> >> producer with the same transactionalId which fences the current one.
> >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
> >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
> >> flush offsets to storage:
> >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> >> [kafka-producer-network-thread |
> >> connector-producer-json-sftp-source-connector-0]
> >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> >> producer with the same transactionalId which fences the current one.
> >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
> >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to
> send
> >> record to streams-input:
> >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> >> [kafka-producer-network-thread |
> >> connector-producer-json-sftp-source-connector-0]
> >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> >> producer with the same transactionalId which fences the current one.
> >> 2023-03-12 11:32:45,222 ERROR
> [json-sftp-source-connector|task-0|offsets]
> >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
> >> commit producer transaction
> >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> >> [task-thread-json-sftp-source-connector-0]
> >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> >> producer with the same transactionalId which fences the current one.
> >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
> >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw
> an
> >> uncaught and unrecoverable exception. Task is being killed and will not
> >> recover until manually restarted
> >> (org.apache.kafka.connect.runtime.WorkerTask)
> >> [task-thread-json-sftp-source-connector-0]
> >>
> >> Do you know why it is showing an abort state even if I call commit?
> >>
> >> I tested one more scenario, When I call the commit I saw the below
> >>
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
> >> pendingState=None, topicPartitions=HashSet(streams-input-2),
> >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
> >> Then, before changing the states to Abort, I dropped the next file then
> I
> >> dont see any issues. Previous transaction
> >> as well as the current transaction are committed.
> >>
> >> Thank you for your support.
> >>
> >> Thanks,
> >> Nitty
> >>
> >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton 
> >> wrote:
> >>
> &g

Re: Exactly once kafka connect query

2023-03-13 Thread NITTY BENNY
|
>> connector-producer-json-sftp-source-connector-0]
>> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
>> producer with the same transactionalId which fences the current one.
>> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
>> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
>> flush offsets to storage:
>> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>> [kafka-producer-network-thread |
>> connector-producer-json-sftp-source-connector-0]
>> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
>> producer with the same transactionalId which fences the current one.
>> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
>> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send
>> record to streams-input:
>> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
>> [kafka-producer-network-thread |
>> connector-producer-json-sftp-source-connector-0]
>> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
>> producer with the same transactionalId which fences the current one.
>> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0|offsets]
>> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
>> commit producer transaction
>> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>> [task-thread-json-sftp-source-connector-0]
>> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
>> producer with the same transactionalId which fences the current one.
>> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
>> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw an
>> uncaught and unrecoverable exception. Task is being killed and will not
>> recover until manually restarted
>> (org.apache.kafka.connect.runtime.WorkerTask)
>> [task-thread-json-sftp-source-connector-0]
>>
>> Do you know why it is showing an abort state even if I call commit?
>>
>> I tested one more scenario, When I call the commit I saw the below
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
>> pendingState=None, topicPartitions=HashSet(streams-input-2),
>> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
>> Then, before changing the states to Abort, I dropped the next file then I
>> dont see any issues. Previous transaction
>> as well as the current transaction are committed.
>>
>> Thank you for your support.
>>
>> Thanks,
>> Nitty
>>
>> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton 
>> wrote:
>>
>>> Hi Nitty,
>>>
>>> > I called commitTransaction when I reach the first error record, but
>>> commit is not happening for me. Kafka connect tries to abort the
>>> transaction automatically
>>>
>>> This is really interesting--are you certain that your task never invoked
>>> TransactionContext::abortTransaction in this case? I'm looking over the
>>> code base and it seems fairly clear that the only thing that could
>>> trigger
>>> a call to KafkaProducer::abortTransaction is a request by the task to
>>> abort
>>> a transaction (either for a next batch, or for a specific record). It may
>>> help to run the connector in a debugger and/or look for "Aborting
>>> transaction for batch as requested by connector" or "Aborting transaction
>>> for record on topic  as requested by connector" log
>>> messages (which will be emitted at INFO level by
>>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if
>>> the task is requesting an abort).
>>>
>>> Regardless, I'll work on a fix for the bug with aborting empty
>>> transactions. Thanks for helping uncover that one!
>>>
>>> Cheers,
>>>
>>> Chris
>>>
>>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY  wrote:
>>>
>>> > Hi Chris,
>>> >
>>> > We have a use case to commit previous successful records and stop the
>>> > processing of the current file and move on with the next file. To
>>> achieve
>>> > that I called commitTransaction when I reach the first error record,
>>> but
>>> > commit is not happening for me. Kafka connect tries to abort the
>>> > transaction automatically, I checked the _transaction_state topic and
>>> > states m

Re: Exactly once kafka connect query

2023-03-13 Thread NITTY BENNY
-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw an
> uncaught and unrecoverable exception. Task is being killed and will not
> recover until manually restarted
> (org.apache.kafka.connect.runtime.WorkerTask)
> [task-thread-json-sftp-source-connector-0]
>
> Do you know why it is showing an abort state even if I call commit?
>
> I tested one more scenario, When I call the commit I saw the below
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
> pendingState=None, topicPartitions=HashSet(streams-input-2),
> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
> Then, before changing the states to Abort, I dropped the next file then I
> dont see any issues. Previous transaction
> as well as the current transaction are committed.
>
> Thank you for your support.
>
> Thanks,
> Nitty
>
> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton 
> wrote:
>
>> Hi Nitty,
>>
>> > I called commitTransaction when I reach the first error record, but
>> commit is not happening for me. Kafka connect tries to abort the
>> transaction automatically
>>
>> This is really interesting--are you certain that your task never invoked
>> TransactionContext::abortTransaction in this case? I'm looking over the
>> code base and it seems fairly clear that the only thing that could trigger
>> a call to KafkaProducer::abortTransaction is a request by the task to
>> abort
>> a transaction (either for a next batch, or for a specific record). It may
>> help to run the connector in a debugger and/or look for "Aborting
>> transaction for batch as requested by connector" or "Aborting transaction
>> for record on topic  as requested by connector" log
>> messages (which will be emitted at INFO level by
>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if
>> the task is requesting an abort).
>>
>> Regardless, I'll work on a fix for the bug with aborting empty
>> transactions. Thanks for helping uncover that one!
>>
>> Cheers,
>>
>> Chris
>>
>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY  wrote:
>>
>> > Hi Chris,
>> >
>> > We have a use case to commit previous successful records and stop the
>> > processing of the current file and move on with the next file. To
>> achieve
>> > that I called commitTransaction when I reach the first error record, but
>> > commit is not happening for me. Kafka connect tries to abort the
>> > transaction automatically, I checked the _transaction_state topic and
>> > states marked as PrepareAbort and CompleteAbort. Do you know why kafka
>> > connect automatically invokes abort instead of the implicit commit I
>> > called?
>> > Then as a result, when I tries to parse the next file - say ABC, I saw
>> the
>> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent
>> record to
>> > topic", and we lost the first batch of records from the current
>> transaction
>> > in the file ABC.
>> >
>> > Is it possible that there's a case where an abort is being requested
>> while
>> > the current transaction is empty (i.e., the task hasn't returned any
>> > records from SourceTask::poll since the last transaction was
>> > committed/aborted)? --- Yes, that case is possible for us. There is a
>> case
>> > where the first record itself an error record.
>> >
>> > Thanks,
>> > Nitty
>> >
>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton 
>> > wrote:
>> >
>> > > Hi Nitty,
>> > >
>> > > Thanks for the code examples and the detailed explanations, this is
>> > really
>> > > helpful!
>> > >
>> > > > Say if I have a file with 5 records and batch size is 2, and in my
>> 3rd
>> > > batch I have one error record then in that batch, I dont have a valid
>> > > record to call commit or abort. But I want to commit all the previous
>> > > batches that were successfully parsed. How do I do that?
>> > >
>> > > An important thing to keep in mind with the TransactionContext API is
>> > that
&g

Re: Exactly once kafka connect query

2023-03-13 Thread NITTY BENNY
 the commit I saw the below
connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
pendingState=None, topicPartitions=HashSet(streams-input-2),
txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
Then, before changing the states to Abort, I dropped the next file then I
dont see any issues. Previous transaction
as well as the current transaction are committed.

Thank you for your support.

Thanks,
Nitty

On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton 
wrote:

> Hi Nitty,
>
> > I called commitTransaction when I reach the first error record, but
> commit is not happening for me. Kafka connect tries to abort the
> transaction automatically
>
> This is really interesting--are you certain that your task never invoked
> TransactionContext::abortTransaction in this case? I'm looking over the
> code base and it seems fairly clear that the only thing that could trigger
> a call to KafkaProducer::abortTransaction is a request by the task to abort
> a transaction (either for a next batch, or for a specific record). It may
> help to run the connector in a debugger and/or look for "Aborting
> transaction for batch as requested by connector" or "Aborting transaction
> for record on topic  as requested by connector" log
> messages (which will be emitted at INFO level by
> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if
> the task is requesting an abort).
>
> Regardless, I'll work on a fix for the bug with aborting empty
> transactions. Thanks for helping uncover that one!
>
> Cheers,
>
> Chris
>
> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY  wrote:
>
> > Hi Chris,
> >
> > We have a use case to commit previous successful records and stop the
> > processing of the current file and move on with the next file. To achieve
> > that I called commitTransaction when I reach the first error record, but
> > commit is not happening for me. Kafka connect tries to abort the
> > transaction automatically, I checked the _transaction_state topic and
> > states marked as PrepareAbort and CompleteAbort. Do you know why kafka
> > connect automatically invokes abort instead of the implicit commit I
> > called?
> > Then as a result, when I tries to parse the next file - say ABC, I saw
> the
> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent record
> to
> > topic", and we lost the first batch of records from the current
> transaction
> > in the file ABC.
> >
> > Is it possible that there's a case where an abort is being requested
> while
> > the current transaction is empty (i.e., the task hasn't returned any
> > records from SourceTask::poll since the last transaction was
> > committed/aborted)? --- Yes, that case is possible for us. There is a
> case
> > where the first record itself an error record.
> >
> > Thanks,
> > Nitty
> >
> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton 
> > wrote:
> >
> > > Hi Nitty,
> > >
> > > Thanks for the code examples and the detailed explanations, this is
> > really
> > > helpful!
> > >
> > > > Say if I have a file with 5 records and batch size is 2, and in my
> 3rd
> > > batch I have one error record then in that batch, I dont have a valid
> > > record to call commit or abort. But I want to commit all the previous
> > > batches that were successfully parsed. How do I do that?
> > >
> > > An important thing to keep in mind with the TransactionContext API is
> > that
> > > all records that a task returns from SourceTask::poll are implicitly
> > > included in a transaction. Invoking
> SourceTaskContext::transactionContext
> > > doesn't alter this or cause transactions to start being used;
> everything
> > is
> > > already in a transaction, and the Connect runtime automatically begins
> > > transactions for any records it sees from the task if it hasn't already
> > > begun one. It's also valid to return a null or empty list of records
> from
> > > SourceTask::poll. So in this case, you can invoke
> > > transactionContext.commitTransaction() (the no-args variant) and return
> > an
> > > empty batch from SourceTask::poll, which will cause the transaction
> > > containing the 4 valid records that were returned in the last 2 batches
> > to
> > > be committed.
> > >
> > > FWIW, I would be a little cautious about this approach. Many times it's
> > > better to fai

Re: Exactly once kafka connect query

2023-03-10 Thread Chris Egerton
Hi Nitty,

> I called commitTransaction when I reach the first error record, but
commit is not happening for me. Kafka connect tries to abort the
transaction automatically

This is really interesting--are you certain that your task never invoked
TransactionContext::abortTransaction in this case? I'm looking over the
code base and it seems fairly clear that the only thing that could trigger
a call to KafkaProducer::abortTransaction is a request by the task to abort
a transaction (either for a next batch, or for a specific record). It may
help to run the connector in a debugger and/or look for "Aborting
transaction for batch as requested by connector" or "Aborting transaction
for record on topic  as requested by connector" log
messages (which will be emitted at INFO level by
the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if
the task is requesting an abort).

Regardless, I'll work on a fix for the bug with aborting empty
transactions. Thanks for helping uncover that one!

Cheers,

Chris

On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY  wrote:

> Hi Chris,
>
> We have a use case to commit previous successful records and stop the
> processing of the current file and move on with the next file. To achieve
> that I called commitTransaction when I reach the first error record, but
> commit is not happening for me. Kafka connect tries to abort the
> transaction automatically, I checked the _transaction_state topic and
> states marked as PrepareAbort and CompleteAbort. Do you know why kafka
> connect automatically invokes abort instead of the implicit commit I
> called?
> Then as a result, when I tries to parse the next file - say ABC, I saw the
> logs "Aborting incomplete transaction" and ERROR: "Failed to sent record to
> topic", and we lost the first batch of records from the current transaction
> in the file ABC.
>
> Is it possible that there's a case where an abort is being requested while
> the current transaction is empty (i.e., the task hasn't returned any
> records from SourceTask::poll since the last transaction was
> committed/aborted)? --- Yes, that case is possible for us. There is a case
> where the first record itself an error record.
>
> Thanks,
> Nitty
>
> On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton 
> wrote:
>
> > Hi Nitty,
> >
> > Thanks for the code examples and the detailed explanations, this is
> really
> > helpful!
> >
> > > Say if I have a file with 5 records and batch size is 2, and in my 3rd
> > batch I have one error record then in that batch, I dont have a valid
> > record to call commit or abort. But I want to commit all the previous
> > batches that were successfully parsed. How do I do that?
> >
> > An important thing to keep in mind with the TransactionContext API is
> that
> > all records that a task returns from SourceTask::poll are implicitly
> > included in a transaction. Invoking SourceTaskContext::transactionContext
> > doesn't alter this or cause transactions to start being used; everything
> is
> > already in a transaction, and the Connect runtime automatically begins
> > transactions for any records it sees from the task if it hasn't already
> > begun one. It's also valid to return a null or empty list of records from
> > SourceTask::poll. So in this case, you can invoke
> > transactionContext.commitTransaction() (the no-args variant) and return
> an
> > empty batch from SourceTask::poll, which will cause the transaction
> > containing the 4 valid records that were returned in the last 2 batches
> to
> > be committed.
> >
> > FWIW, I would be a little cautious about this approach. Many times it's
> > better to fail fast on invalid data; it might be worth it to at least
> allow
> > users to configure whether the connector fails on invalid data, or
> silently
> > skips over it (which is what happens when transactions are aborted).
> >
> > > Why is abort not working without adding the last record to the list?
> >
> > Is it possible that there's a case where an abort is being requested
> while
> > the current transaction is empty (i.e., the task hasn't returned any
> > records from SourceTask::poll since the last transaction was
> > committed/aborted)? I think this may be a bug in the Connect framework
> > where we don't check to see if a transaction is already open when a task
> > requests that a transaction be aborted, which can cause tasks to fail
> (see
> > https://issues.apache.org/jira/browse/KAFKA-14799 for more details).
> >
> > Cheers,
> >
> > Chris
> >
> >
> > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY  wrote:

Re: Exactly once kafka connect query

2023-03-09 Thread NITTY BENNY
Hi Chris,

We have a use case to commit previous successful records and stop the
processing of the current file and move on with the next file. To achieve
that I called commitTransaction when I reach the first error record, but
commit is not happening for me. Kafka connect tries to abort the
transaction automatically, I checked the _transaction_state topic and
states marked as PrepareAbort and CompleteAbort. Do you know why kafka
connect automatically invokes abort instead of the implicit commit I called?
Then as a result, when I tries to parse the next file - say ABC, I saw the
logs "Aborting incomplete transaction" and ERROR: "Failed to sent record to
topic", and we lost the first batch of records from the current transaction
in the file ABC.

Is it possible that there's a case where an abort is being requested while
the current transaction is empty (i.e., the task hasn't returned any
records from SourceTask::poll since the last transaction was
committed/aborted)? --- Yes, that case is possible for us. There is a case
where the first record itself an error record.

Thanks,
Nitty

On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton 
wrote:

> Hi Nitty,
>
> Thanks for the code examples and the detailed explanations, this is really
> helpful!
>
> > Say if I have a file with 5 records and batch size is 2, and in my 3rd
> batch I have one error record then in that batch, I dont have a valid
> record to call commit or abort. But I want to commit all the previous
> batches that were successfully parsed. How do I do that?
>
> An important thing to keep in mind with the TransactionContext API is that
> all records that a task returns from SourceTask::poll are implicitly
> included in a transaction. Invoking SourceTaskContext::transactionContext
> doesn't alter this or cause transactions to start being used; everything is
> already in a transaction, and the Connect runtime automatically begins
> transactions for any records it sees from the task if it hasn't already
> begun one. It's also valid to return a null or empty list of records from
> SourceTask::poll. So in this case, you can invoke
> transactionContext.commitTransaction() (the no-args variant) and return an
> empty batch from SourceTask::poll, which will cause the transaction
> containing the 4 valid records that were returned in the last 2 batches to
> be committed.
>
> FWIW, I would be a little cautious about this approach. Many times it's
> better to fail fast on invalid data; it might be worth it to at least allow
> users to configure whether the connector fails on invalid data, or silently
> skips over it (which is what happens when transactions are aborted).
>
> > Why is abort not working without adding the last record to the list?
>
> Is it possible that there's a case where an abort is being requested while
> the current transaction is empty (i.e., the task hasn't returned any
> records from SourceTask::poll since the last transaction was
> committed/aborted)? I think this may be a bug in the Connect framework
> where we don't check to see if a transaction is already open when a task
> requests that a transaction be aborted, which can cause tasks to fail (see
> https://issues.apache.org/jira/browse/KAFKA-14799 for more details).
>
> Cheers,
>
> Chris
>
>
> On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY  wrote:
>
> > Hi Chris,
> >
> > I am not sure if you are able to see the images I shared with you .
> > Copying the code snippet below,
> >
> >  if (expectedRecordCount >= 0) {
> > int missingCount = expectedRecordCount - (int) this.
> > recordOffset() - 1;
> > if (missingCount > 0) {
> >   if (transactionContext != null) {
> > isMissedRecords = true;
> >   } else {
> > throw new DataException(String.format("Missing %d records
> > (expecting %d, actual %d)", missingCount, expectedRecordCount, this.
> > recordOffset()));
> >   }
> > } else if (missingCount < 0) {
> >   if (transactionContext != null) {
> > isMissedRecords = true;
> >   } else {
> > throw new DataException(String.format("Too many records
> > (expecting %d, actual %d)", expectedRecordCount, this.recordOffset()));
> >   }
> > }
> >   }
> >   addLastRecord(records, null, value);
> > }
> >
> >
> >
> >  //asn1 or binary abort
> > if((config.parseErrorThreshold != null && parseErrorCount >=
> > config.parseErrorThreshold
> > && lastbatch && transactionC

Re: Exactly once kafka connect query

2023-03-09 Thread Chris Egerton
hen in that batch, I dont have a valid record to call commit or abort. But
>> I want to commit all the previous batches that were successfully parsed.
>> How do I do that?
>>
>> Second use case is where I want to abort a transaction if the record
>> count doesn't match.
>> Code Snippet :
>> [image: image.png]
>> There are no error records in this case. If you see I added the condition
>> of transactionContext check to implement exactly once, without
>> transaction it was just throwing the exception without calling the
>> addLastRecord() method and in the catch block it just logs the message and
>> return the list of records without the last record to poll().To make it
>> work, I called the method addLastRecord() in this case, so it is not
>> throwing the exception and list has last record as well. Then I called the
>> abort, everything got aborted. Why is abort not working without adding the
>> last record to the list?
>> [image: image.png]
>>
>> Code to call abort.
>>
>>
>>
>>
>> Thanks,
>> Nitty
>>
>> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton 
>> wrote:
>>
>>> Hi Nitty,
>>>
>>> I'm a little confused about what you mean by this part:
>>>
>>> > transaction is not getting completed because it is not commiting the
>>> transaction offest.
>>>
>>> The only conditions required for a transaction to be completed when a
>>> connector is defining its own transaction boundaries are:
>>>
>>> 1. The task requests a transaction commit/abort from the
>>> TransactionContext
>>> 2. The task returns a batch of records from SourceTask::poll (and, if
>>> using
>>> the per-record API provided by the TransactionContext class, includes at
>>> least one record that should trigger a transaction commit/abort in that
>>> batch)
>>>
>>> The Connect runtime should automatically commit source offsets to Kafka
>>> whenever a transaction is completed, either by commit or abort. This is
>>> because transactions should only be aborted for data that should never be
>>> re-read by the connector; if there is a validation error that should be
>>> handled by reconfiguring the connector, then the task should throw an
>>> exception instead of aborting the transaction.
>>>
>>> If possible, do you think you could provide a brief code snippet
>>> illustrating what your task is doing that's causing issues?
>>>
>>> Cheers,
>>>
>>> Chris (not Chrise )
>>>
>>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY 
>>> wrote:
>>>
>>> > Hi Chrise,
>>> >
>>> > Thanks for sharing the details.
>>> >
>>> > Regarding the use case, For Asn1 source connector we have a use case to
>>> > validate number of records in the file with the number of records in
>>> the
>>> > header. So currently, if validation fails we are not sending the last
>>> > record to the topic. But after introducing exactly once with connector
>>> > transaction boundary, I can see that if I call an abort when the
>>> validation
>>> > fails, transaction is not getting completed because it is not
>>> commiting the
>>> > transaction offest. I saw that transaction state changed to
>>> CompleteAbort.
>>> > So for my next transaction I am getting InvalidProducerEpochException
>>> and
>>> > then task stopped after that. I tried calling the abort after sending
>>> last
>>> > record to the topic then transaction getting completed.
>>> >
>>> > I dont know if I am doing anything wrong here.
>>> >
>>> > Please advise.
>>> > Thanks,
>>> > Nitty
>>> >
>>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton >> >
>>> > wrote:
>>> >
>>> > > Hi Nitty,
>>> > >
>>> > > We've recently added some documentation on implementing exactly-once
>>> > source
>>> > > connectors here:
>>> > >
>>> >
>>> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
>>> > > .
>>> > > To quote a relevant passage from those docs:
>>> > >
>>> > > > In order for a source connector to take advantage of this support,
>>> it
>>> > > must be able to provide meaningful source offsets for each record
>>&g

Re: Exactly once kafka connect query

2023-03-08 Thread NITTY BENNY
ould provide a brief code snippet
>> illustrating what your task is doing that's causing issues?
>>
>> Cheers,
>>
>> Chris (not Chrise )
>>
>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY  wrote:
>>
>> > Hi Chrise,
>> >
>> > Thanks for sharing the details.
>> >
>> > Regarding the use case, For Asn1 source connector we have a use case to
>> > validate number of records in the file with the number of records in the
>> > header. So currently, if validation fails we are not sending the last
>> > record to the topic. But after introducing exactly once with connector
>> > transaction boundary, I can see that if I call an abort when the
>> validation
>> > fails, transaction is not getting completed because it is not commiting
>> the
>> > transaction offest. I saw that transaction state changed to
>> CompleteAbort.
>> > So for my next transaction I am getting InvalidProducerEpochException
>> and
>> > then task stopped after that. I tried calling the abort after sending
>> last
>> > record to the topic then transaction getting completed.
>> >
>> > I dont know if I am doing anything wrong here.
>> >
>> > Please advise.
>> > Thanks,
>> > Nitty
>> >
>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton 
>> > wrote:
>> >
>> > > Hi Nitty,
>> > >
>> > > We've recently added some documentation on implementing exactly-once
>> > source
>> > > connectors here:
>> > >
>> >
>> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
>> > > .
>> > > To quote a relevant passage from those docs:
>> > >
>> > > > In order for a source connector to take advantage of this support,
>> it
>> > > must be able to provide meaningful source offsets for each record
>> that it
>> > > emits, and resume consumption from the external system at the exact
>> > > position corresponding to any of those offsets without dropping or
>> > > duplicating messages.
>> > >
>> > > So, as long as your source connector is able to use the Kafka Connect
>> > > framework's offsets API correctly, it shouldn't be necessary to make
>> any
>> > > other code changes to the connector.
>> > >
>> > > To enable exactly-once support for source connectors on your Connect
>> > > cluster, see the docs section here:
>> > > https://kafka.apache.org/documentation/#connect_exactlyoncesource
>> > >
>> > > With regard to transactions, a transactional producer is always
>> created
>> > > automatically for your connector by the Connect runtime when
>> exactly-once
>> > > support is enabled on the worker. The only reason to set
>> > > "transaction.boundary" to "connector" is if your connector would like
>> to
>> > > explicitly define its own transaction boundaries. In this case, it
>> sounds
>> > > like may be what you want; I just want to make sure to call out that
>> in
>> > > either case, you should not be directly instantiating a producer in
>> your
>> > > connector code, but let the Kafka Connect runtime do that for you, and
>> > just
>> > > worry about returning the right records from SourceTask::poll (and
>> > possibly
>> > > defining custom transactions using the TransactionContext API).
>> > >
>> > > With respect to your question about committing or aborting in certain
>> > > circumstances, it'd be useful to know more about your use case, since
>> it
>> > > may not be necessary to define custom transaction boundaries in your
>> > > connector at all.
>> > >
>> > > Cheers,
>> > >
>> > > Chris
>> > >
>> > >
>> > >
>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY 
>> wrote:
>> > >
>> > > > Hi Team,
>> > > >
>> > > > Adding on top of this, I tried creating a TransactionContext object
>> and
>> > > > calling the commitTransaction and abortTranaction methods in source
>> > > > connectors.
>> > > > But the main problem I saw is that if there is any error while
>> parsing
>> > > the
>> > > > record, connect is calling an abort but we have a use case to call
>> > commit
>> > > > in some cases. Is it a valid use case in terms of kafka connect?
>> > > >
>> > > > Another Question - Should I use a transactional producer instead
>> > > > creating an object of TransactionContext? Below is the connector
>> > > > configuration I am using.
>> > > >
>> > > >   exactly.once.support: "required"
>> > > >   transaction.boundary: "connector"
>> > > >
>> > > > Could you please help me here?
>> > > >
>> > > > Thanks,
>> > > > Nitty
>> > > >
>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY 
>> > > wrote:
>> > > >
>> > > > > Hi Team,
>> > > > > I am trying to implement exactly once behavior in our source
>> > connector.
>> > > > Is
>> > > > > there any sample source connector implementation available to
>> have a
>> > > look
>> > > > > at?
>> > > > > Regards,
>> > > > > Nitty
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: Exactly once kafka connect query

2023-03-08 Thread NITTY BENNY
Hi Chris,
Sorry for the typo in my previous email.

Regarding the point 2,* the task returns a batch of records from
SourceTask::poll (and, if using*


*the per-record API provided by the TransactionContext class, includes
atleast one record that should trigger a transaction commit/abort in
thatbatch)*
What if I am using the API without passing a record? We have 2 types of use
cases, one where on encountering an error record, we want to commit
previous successful batches and disregard the failed record and upcoming
batches. In this case we created the transactionContext just before reading
the file (file is our transaction boundary).Say if I have a file with 5
records and batch size is 2, and in my 3rd batch I have one error record
then in that batch, I dont have a valid record to call commit or abort. But
I want to commit all the previous batches that were successfully parsed.
How do I do that?

Second use case is where I want to abort a transaction if the record count
doesn't match.
Code Snippet :
[image: image.png]
There are no error records in this case. If you see I added the condition
of transactionContext check to implement exactly once, without
transaction it was just throwing the exception without calling the
addLastRecord() method and in the catch block it just logs the message and
return the list of records without the last record to poll().To make it
work, I called the method addLastRecord() in this case, so it is not
throwing the exception and list has last record as well. Then I called the
abort, everything got aborted. Why is abort not working without adding the
last record to the list?
[image: image.png]

Code to call abort.




Thanks,
Nitty

On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton 
wrote:

> Hi Nitty,
>
> I'm a little confused about what you mean by this part:
>
> > transaction is not getting completed because it is not commiting the
> transaction offest.
>
> The only conditions required for a transaction to be completed when a
> connector is defining its own transaction boundaries are:
>
> 1. The task requests a transaction commit/abort from the TransactionContext
> 2. The task returns a batch of records from SourceTask::poll (and, if using
> the per-record API provided by the TransactionContext class, includes at
> least one record that should trigger a transaction commit/abort in that
> batch)
>
> The Connect runtime should automatically commit source offsets to Kafka
> whenever a transaction is completed, either by commit or abort. This is
> because transactions should only be aborted for data that should never be
> re-read by the connector; if there is a validation error that should be
> handled by reconfiguring the connector, then the task should throw an
> exception instead of aborting the transaction.
>
> If possible, do you think you could provide a brief code snippet
> illustrating what your task is doing that's causing issues?
>
> Cheers,
>
> Chris (not Chrise )
>
> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY  wrote:
>
> > Hi Chrise,
> >
> > Thanks for sharing the details.
> >
> > Regarding the use case, For Asn1 source connector we have a use case to
> > validate number of records in the file with the number of records in the
> > header. So currently, if validation fails we are not sending the last
> > record to the topic. But after introducing exactly once with connector
> > transaction boundary, I can see that if I call an abort when the
> validation
> > fails, transaction is not getting completed because it is not commiting
> the
> > transaction offest. I saw that transaction state changed to
> CompleteAbort.
> > So for my next transaction I am getting InvalidProducerEpochException and
> > then task stopped after that. I tried calling the abort after sending
> last
> > record to the topic then transaction getting completed.
> >
> > I dont know if I am doing anything wrong here.
> >
> > Please advise.
> > Thanks,
> > Nitty
> >
> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton 
> > wrote:
> >
> > > Hi Nitty,
> > >
> > > We've recently added some documentation on implementing exactly-once
> > source
> > > connectors here:
> > >
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > > .
> > > To quote a relevant passage from those docs:
> > >
> > > > In order for a source connector to take advantage of this support, it
> > > must be able to provide meaningful source offsets for each record that
> it
> > > emits, and resume consumption from the external system at the exact
> > > position corresponding to any of those offsets without dropping or
> > > du

Re: Exactly once kafka connect query

2023-03-08 Thread Chris Egerton
Hi Nitty,

I'm a little confused about what you mean by this part:

> transaction is not getting completed because it is not commiting the
transaction offest.

The only conditions required for a transaction to be completed when a
connector is defining its own transaction boundaries are:

1. The task requests a transaction commit/abort from the TransactionContext
2. The task returns a batch of records from SourceTask::poll (and, if using
the per-record API provided by the TransactionContext class, includes at
least one record that should trigger a transaction commit/abort in that
batch)

The Connect runtime should automatically commit source offsets to Kafka
whenever a transaction is completed, either by commit or abort. This is
because transactions should only be aborted for data that should never be
re-read by the connector; if there is a validation error that should be
handled by reconfiguring the connector, then the task should throw an
exception instead of aborting the transaction.

If possible, do you think you could provide a brief code snippet
illustrating what your task is doing that's causing issues?

Cheers,

Chris (not Chrise )

On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY  wrote:

> Hi Chrise,
>
> Thanks for sharing the details.
>
> Regarding the use case, For Asn1 source connector we have a use case to
> validate number of records in the file with the number of records in the
> header. So currently, if validation fails we are not sending the last
> record to the topic. But after introducing exactly once with connector
> transaction boundary, I can see that if I call an abort when the validation
> fails, transaction is not getting completed because it is not commiting the
> transaction offest. I saw that transaction state changed to CompleteAbort.
> So for my next transaction I am getting InvalidProducerEpochException and
> then task stopped after that. I tried calling the abort after sending last
> record to the topic then transaction getting completed.
>
> I dont know if I am doing anything wrong here.
>
> Please advise.
> Thanks,
> Nitty
>
> On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton 
> wrote:
>
> > Hi Nitty,
> >
> > We've recently added some documentation on implementing exactly-once
> source
> > connectors here:
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > .
> > To quote a relevant passage from those docs:
> >
> > > In order for a source connector to take advantage of this support, it
> > must be able to provide meaningful source offsets for each record that it
> > emits, and resume consumption from the external system at the exact
> > position corresponding to any of those offsets without dropping or
> > duplicating messages.
> >
> > So, as long as your source connector is able to use the Kafka Connect
> > framework's offsets API correctly, it shouldn't be necessary to make any
> > other code changes to the connector.
> >
> > To enable exactly-once support for source connectors on your Connect
> > cluster, see the docs section here:
> > https://kafka.apache.org/documentation/#connect_exactlyoncesource
> >
> > With regard to transactions, a transactional producer is always created
> > automatically for your connector by the Connect runtime when exactly-once
> > support is enabled on the worker. The only reason to set
> > "transaction.boundary" to "connector" is if your connector would like to
> > explicitly define its own transaction boundaries. In this case, it sounds
> > like may be what you want; I just want to make sure to call out that in
> > either case, you should not be directly instantiating a producer in your
> > connector code, but let the Kafka Connect runtime do that for you, and
> just
> > worry about returning the right records from SourceTask::poll (and
> possibly
> > defining custom transactions using the TransactionContext API).
> >
> > With respect to your question about committing or aborting in certain
> > circumstances, it'd be useful to know more about your use case, since it
> > may not be necessary to define custom transaction boundaries in your
> > connector at all.
> >
> > Cheers,
> >
> > Chris
> >
> >
> >
> > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY  wrote:
> >
> > > Hi Team,
> > >
> > > Adding on top of this, I tried creating a TransactionContext object and
> > > calling the commitTransaction and abortTranaction methods in source
> > > connectors.
> > > But the main problem I saw is that if there is any error while parsing
> > the
> > > record, connect is cal

Fwd: Exactly once kafka connect query

2023-03-07 Thread NITTY BENNY
Hi Chrise,

Thanks for sharing the details.

Regarding the use case, For Asn1 source connector we have a use case to
validate number of records in the file with the number of records in the
header. So currently, if validation fails we are not sending the last
record to the topic. But after introducing exactly once with connector
transaction boundary, I can see that if I call an abort when the validation
fails, transaction is not getting completed because it is not commiting the
transaction offest. I saw that transaction state changed to CompleteAbort.
So for my next transaction I am getting InvalidProducerEpochException and
then task stopped after that. I tried calling the abort after sending last
record to the topic then transaction getting completed.

I dont know if I am doing anything wrong here.

Please advise.
Thanks,
Nitty

On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton 
wrote:

> Hi Nitty,
>
> We've recently added some documentation on implementing exactly-once source
> connectors here:
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> .
> To quote a relevant passage from those docs:
>
> > In order for a source connector to take advantage of this support, it
> must be able to provide meaningful source offsets for each record that it
> emits, and resume consumption from the external system at the exact
> position corresponding to any of those offsets without dropping or
> duplicating messages.
>
> So, as long as your source connector is able to use the Kafka Connect
> framework's offsets API correctly, it shouldn't be necessary to make any
> other code changes to the connector.
>
> To enable exactly-once support for source connectors on your Connect
> cluster, see the docs section here:
> https://kafka.apache.org/documentation/#connect_exactlyoncesource
>
> With regard to transactions, a transactional producer is always created
> automatically for your connector by the Connect runtime when exactly-once
> support is enabled on the worker. The only reason to set
> "transaction.boundary" to "connector" is if your connector would like to
> explicitly define its own transaction boundaries. In this case, it sounds
> like may be what you want; I just want to make sure to call out that in
> either case, you should not be directly instantiating a producer in your
> connector code, but let the Kafka Connect runtime do that for you, and just
> worry about returning the right records from SourceTask::poll (and possibly
> defining custom transactions using the TransactionContext API).
>
> With respect to your question about committing or aborting in certain
> circumstances, it'd be useful to know more about your use case, since it
> may not be necessary to define custom transaction boundaries in your
> connector at all.
>
> Cheers,
>
> Chris
>
>
>
> On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY  wrote:
>
> > Hi Team,
> >
> > Adding on top of this, I tried creating a TransactionContext object and
> > calling the commitTransaction and abortTranaction methods in source
> > connectors.
> > But the main problem I saw is that if there is any error while parsing
> the
> > record, connect is calling an abort but we have a use case to call commit
> > in some cases. Is it a valid use case in terms of kafka connect?
> >
> > Another Question - Should I use a transactional producer instead
> > creating an object of TransactionContext? Below is the connector
> > configuration I am using.
> >
> >   exactly.once.support: "required"
> >   transaction.boundary: "connector"
> >
> > Could you please help me here?
> >
> > Thanks,
> > Nitty
> >
> > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY 
> wrote:
> >
> > > Hi Team,
> > > I am trying to implement exactly once behavior in our source connector.
> > Is
> > > there any sample source connector implementation available to have a
> look
> > > at?
> > > Regards,
> > > Nitty
> > >
> >
>


Re: Exactly once kafka connect query

2023-03-07 Thread NITTY BENNY
Hi Chrise,

Thanks for sharing the details.

Regarding the use case, For Asn1 source connector we have a use case to
validate number of records in the file with the number of records in the
header. So currently, if validation fails we are not sending the last
record to the topic. But after introducing exactly once with connector
transaction boundary, I can see that if I call an abort when the validation
fails, transaction is not getting completed because it is not commiting the
transaction offest. I saw that transaction state changed to CompleteAbort.
So for my next transaction I am getting InvalidProducerEpochException and
then task stopped after that. I tried calling the abort after sending last
record to the topic then transaction getting completed.

I dont know if I am doing anything wrong here.

Please advise.
Thanks,
Nitty

On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton 
wrote:

> Hi Nitty,
>
> We've recently added some documentation on implementing exactly-once source
> connectors here:
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> .
> To quote a relevant passage from those docs:
>
> > In order for a source connector to take advantage of this support, it
> must be able to provide meaningful source offsets for each record that it
> emits, and resume consumption from the external system at the exact
> position corresponding to any of those offsets without dropping or
> duplicating messages.
>
> So, as long as your source connector is able to use the Kafka Connect
> framework's offsets API correctly, it shouldn't be necessary to make any
> other code changes to the connector.
>
> To enable exactly-once support for source connectors on your Connect
> cluster, see the docs section here:
> https://kafka.apache.org/documentation/#connect_exactlyoncesource
>
> With regard to transactions, a transactional producer is always created
> automatically for your connector by the Connect runtime when exactly-once
> support is enabled on the worker. The only reason to set
> "transaction.boundary" to "connector" is if your connector would like to
> explicitly define its own transaction boundaries. In this case, it sounds
> like may be what you want; I just want to make sure to call out that in
> either case, you should not be directly instantiating a producer in your
> connector code, but let the Kafka Connect runtime do that for you, and just
> worry about returning the right records from SourceTask::poll (and possibly
> defining custom transactions using the TransactionContext API).
>
> With respect to your question about committing or aborting in certain
> circumstances, it'd be useful to know more about your use case, since it
> may not be necessary to define custom transaction boundaries in your
> connector at all.
>
> Cheers,
>
> Chris
>
>
>
> On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY  wrote:
>
> > Hi Team,
> >
> > Adding on top of this, I tried creating a TransactionContext object and
> > calling the commitTransaction and abortTranaction methods in source
> > connectors.
> > But the main problem I saw is that if there is any error while parsing
> the
> > record, connect is calling an abort but we have a use case to call commit
> > in some cases. Is it a valid use case in terms of kafka connect?
> >
> > Another Question - Should I use a transactional producer instead
> > creating an object of TransactionContext? Below is the connector
> > configuration I am using.
> >
> >   exactly.once.support: "required"
> >   transaction.boundary: "connector"
> >
> > Could you please help me here?
> >
> > Thanks,
> > Nitty
> >
> > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY 
> wrote:
> >
> > > Hi Team,
> > > I am trying to implement exactly once behavior in our source connector.
> > Is
> > > there any sample source connector implementation available to have a
> look
> > > at?
> > > Regards,
> > > Nitty
> > >
> >
>


Re: Exactly once kafka connect query

2023-03-07 Thread Chris Egerton
Hi Nitty,

We've recently added some documentation on implementing exactly-once source
connectors here:
https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors.
To quote a relevant passage from those docs:

> In order for a source connector to take advantage of this support, it
must be able to provide meaningful source offsets for each record that it
emits, and resume consumption from the external system at the exact
position corresponding to any of those offsets without dropping or
duplicating messages.

So, as long as your source connector is able to use the Kafka Connect
framework's offsets API correctly, it shouldn't be necessary to make any
other code changes to the connector.

To enable exactly-once support for source connectors on your Connect
cluster, see the docs section here:
https://kafka.apache.org/documentation/#connect_exactlyoncesource

With regard to transactions, a transactional producer is always created
automatically for your connector by the Connect runtime when exactly-once
support is enabled on the worker. The only reason to set
"transaction.boundary" to "connector" is if your connector would like to
explicitly define its own transaction boundaries. In this case, it sounds
like may be what you want; I just want to make sure to call out that in
either case, you should not be directly instantiating a producer in your
connector code, but let the Kafka Connect runtime do that for you, and just
worry about returning the right records from SourceTask::poll (and possibly
defining custom transactions using the TransactionContext API).

With respect to your question about committing or aborting in certain
circumstances, it'd be useful to know more about your use case, since it
may not be necessary to define custom transaction boundaries in your
connector at all.

Cheers,

Chris



On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY  wrote:

> Hi Team,
>
> Adding on top of this, I tried creating a TransactionContext object and
> calling the commitTransaction and abortTranaction methods in source
> connectors.
> But the main problem I saw is that if there is any error while parsing the
> record, connect is calling an abort but we have a use case to call commit
> in some cases. Is it a valid use case in terms of kafka connect?
>
> Another Question - Should I use a transactional producer instead
> creating an object of TransactionContext? Below is the connector
> configuration I am using.
>
>   exactly.once.support: "required"
>   transaction.boundary: "connector"
>
> Could you please help me here?
>
> Thanks,
> Nitty
>
> On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY  wrote:
>
> > Hi Team,
> > I am trying to implement exactly once behavior in our source connector.
> Is
> > there any sample source connector implementation available to have a look
> > at?
> > Regards,
> > Nitty
> >
>


Fwd: Exactly once kafka connect query

2023-03-07 Thread NITTY BENNY
Hi Team,

Adding on top of this, I tried creating a TransactionContext object and
calling the commitTransaction and abortTranaction methods in source
connectors.
But the main problem I saw is that if there is any error while parsing the
record, connect is calling an abort but we have a use case to call commit
in some cases. Is it a valid use case in terms of kafka connect?

Another Question - Should I use a transactional producer instead
creating an object of TransactionContext? Below is the connector
configuration I am using.

  exactly.once.support: "required"
  transaction.boundary: "connector"

Could you please help me here?

Thanks,
Nitty

On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY  wrote:

> Hi Team,
> I am trying to implement exactly once behavior in our source connector. Is
> there any sample source connector implementation available to have a look
> at?
> Regards,
> Nitty
>


Exactly once kafka connect query

2023-03-06 Thread NITTY BENNY
Hi Team,
I am trying to implement exactly once behavior in our source connector. Is
there any sample source connector implementation available to have a look
at?
Regards,
Nitty


Kafka Connect issue | Debezium POC

2023-02-17 Thread HariBabu kuruva
Hi All,

I am working on Debezium POC.

We have a zookeeper, Kafka broker and kafka connect service.

As per the logs the debezium connector is working fine. But kafka topics
are not created automatically(auto topic creation enabled), except a few
default topics ex: app_webapp.persona, app_webapp.role.

In the Connector logs i could see below INFO. Please help me understand if
there is any issue at kafka end.


[2023-02-16 20:48:15,290] INFO [Consumer
clientId=consumer-connect-cluster-3, groupId=connect-cluster] Group
coordinator vmclxcfgascl
n01.corp.equinix.com:9092 (id: 2147483646 rack: null)* is unavailable or
invalid due to cause: coordinator unavailable.isDisconnected:*
 false. Rediscovery will be attempted.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:942)
[2023-02-16 20:48:15,290] INFO [Consumer
clientId=consumer-connect-cluster-3, groupId=connect-cluster] Requesting
disconnect from las
t known coordinator vmclxcfgascln01.corp.equinix.com:9092 (id: 2147483646
rack: null) (org.apache.kafka.clients.consumer.internals.Co
nsumerCoordinator:955)
[2023-02-16 20:48:15,392] INFO [Consumer
clientId=consumer-connect-cluster-2, groupId=connect-cluster] Discovered
group coordinator v
mclxcfgascln01.corp.equinix.com:9092 (id: 2147483646 rack: null)
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:879
)


-- 

Thanks and Regards,
 Hari
Mobile:9790756568


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-16 Thread Frank Grimes
Ah, it definitely seems like KIP-710 will address the issue we've been bitten 
by most.We'll eagerly await the kafka-3.5.0 release and then see if enabling 
'dedicated.mode.enable.internal.rest' is possible with Strimzi.
Thanks for the help and patience! :-)


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-15 Thread Greg Harris
Frank,

> I don't think forcing the API users to introduce the nonce is desirable.

I agree. That is why the nonce is a workaround, and not a proper solution.
It's something to alleviate the symptoms in the short-term until a bugfix &
upgrade can fix it.

> Have you had any ideas on how this can be implemented within Kafka
Connect itself so that it works as expected for all users?

I have not looked into solutions in enough depth to recommend one. If I
had, the PR would be open :)

> We tried adding tasks to trigger a propagation of the task configs
(increased from 36 to 40 tasks) however that did not unblock it.So
triggering this code path did not seem to work:

You may be affected by _another_ bug which is preventing tasks from being
reconfigured which is specific to MM2:
https://issues.apache.org/jira/browse/KAFKA-10586
You can see evidence for this in the DistributedHerder ERROR log "Request
to leader to reconfigure connector tasks failed". A fix for this is
in-flight already.
It appears that Strimzi is using the kafka-mirror-maker.sh script, which is
affected:
https://github.com/strimzi/strimzi-kafka-operator/blob/97b48461d724a9c59505a9ad31b3d184476a83d7/docker-images/kafka-based/kafka/scripts/kafka_mirror_maker_run.sh#L121

> Are there any other (not overly-verbose) classes you recommend we enable
DEBUG logging on

I think you've covered the interesting ones. You can also look and see if
the Mirror*Connector classes are behaving themselves, but it doesn't appear
that the reconfiguration code path has any relevant logs.

> Also, would making the inconsistent connectors (assuming they're being
identified as such by Kafka Connect when this happens) through an API call
also make sense so that this can be detected/monitored more easily?

Unless we have evidence that the config topic being in inconsistent state
(B) as a common problem, I don't think adding monitorability for it has a
high enough ROI to be implemented.
If you feel strongly about it, then you can consider opening a KIP to
describe the public interface changes and how those interfaces would be
used for monitoring.

Inconsistent state (A) however, seems very common. I've seen it in
production myself, it's implicated in KAFKA-9228, KAFKA-10586, and is
clearly causing disturbance to real users.
Fixing the conditions which lead to state (A) is what I'm most interested
in seeing, and should be prioritized first because it's what is going to
have the highest ROI.

Right now you can find connectors in inconsistent state (A) with the
following:
* You can hand-inspect the task configs with the `GET
/{connector}/tasks-config` endpoint since 2.8.0. This does not work for
dedicated MM2 (right now) for precisely the same reason that KAFKA-10586
occurs: the REST API isn't running.
* For mechanical alerting, you can read the config topic and track the
offset for the most recent connector config and compare it with the offset
for the most recent task config. This depends on internal implementation
though, and isn't supported across releases.

I think a way of detecting state (A) via the REST API would be a valuable
addition that could get accepted, if someone is willing to do the legwork
to design, propose and implement it.
It would be valuable even without any bugs present, as the connectors have
to transit through state (A) on each reconfiguration. We can look into this
after getting some tactical fixes in place to avoid the long-term state (A).

Thanks,
Greg Harris



On Wed, Feb 15, 2023 at 9:34 AM Frank Grimes
 wrote:

> So we've just hit this issue again just with the MM2 connector and trying
> to add a new mirrored topic.We're running MirrorMaker 2 in Strimzi. i.e.
> "connector.class":
> "org.apache.kafka.connect.mirror.MirrorSourceConnector"We have 6 worker
> nodes.We changed the config to add a new mirror topic. i.e. append a new
> topic to the MirrorSourceConnector's "topics" config.The MM2 config topic
> reflects the change, as does viewing the config using Kowl UI.However, no
> tasks run to mirror the newly-added topic.We also do not see any updates on
> the MM2 status topic for the mirroring of that newly-added topic.
> We tried adding tasks to trigger a propagation of the task configs
> (increased from 36 to 40 tasks) however that did not unblock it.So
> triggering this code path did not seem to work:
> https://github.com/apache/kafka/blob/8cb0a5e9d3441962896b79163d141607e94d9b54/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1569-L1572
> Only restarting the workers seemed to unblock the propagation of the new
> task config for the new mirrored topic.
> Hopefully this can help us narrow things down a bit...
> In the meantime we've since enabled the following DEBUG logging in
> production to try to get more h

Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-15 Thread Frank Grimes
So we've just hit this issue again just with the MM2 connector and trying to 
add a new mirrored topic.We're running MirrorMaker 2 in Strimzi. i.e. 
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector"We 
have 6 worker nodes.We changed the config to add a new mirror topic. i.e. 
append a new topic to the MirrorSourceConnector's "topics" config.The MM2 
config topic reflects the change, as does viewing the config using Kowl 
UI.However, no tasks run to mirror the newly-added topic.We also do not see any 
updates on the MM2 status topic for the mirroring of that newly-added topic.
We tried adding tasks to trigger a propagation of the task configs (increased 
from 36 to 40 tasks) however that did not unblock it.So triggering this code 
path did not seem to work: 
https://github.com/apache/kafka/blob/8cb0a5e9d3441962896b79163d141607e94d9b54/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1569-L1572
Only restarting the workers seemed to unblock the propagation of the new task 
config for the new mirrored topic.
Hopefully this can help us narrow things down a bit...
In the meantime we've since enabled the following DEBUG logging in production 
to try to get more hints the next time this happens:  
log4j.logger.org.apache.kafka.connect.storage.KafkaConfigBackingStore: DEBUG  
log4j.logger.org.apache.kafka.connect.runtime.distributed.DistributedHerder: 
DEBUG
Perhaps that will show us if it's at all related MM2 config topic compaction 
and/or connectors in inconsistent state.
Are there any other (not overly-verbose) classes you recommend we enable DEBUG 
logging on?
Also, would making the inconsistent connectors (assuming they're being 
identified as such by Kafka Connect when this happens) through an API call also 
make sense so that this can be detected/monitored more easily?
Thanks!



Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-14 Thread Frank Grimes
I don't think forcing the API users to introduce the nonce is desirable.For us, 
it would mean reaching out to the Strimzi project to try to get that 
implemented on their side, which I would imagine would be a proposal which 
would meet some resistance.
Have you had any ideas on how this can be implemented within Kafka Connect 
itself so that it works as expected for all users?
Thanks!


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-09 Thread Greg Harris
Frank,

The configs are being compared after ConfigProviders have been resolved.
This is happening both as a Connector config (by
ClusterConfigState::connectorConfig) and as task configs (by
ClusterConfigState::taskConfig).
This means that two configurations that have different direct contents (the
path to a secret changed) can resolve to the same value if both paths
produce the same value after resolving the config provider.
This also means that if you change the secret on disk and re-submit the
config, the new secret will be resolved in each of the ClusterConfigState
calls, and also end up looking equivalent.

> Would capturing a new generation value within the config itself on every
submitted change be a possible fix/workaround?

This is the workaround I proposed earlier in this conversation for external
users to force updates, to add a nonce to their connector configuration.
I don't think it's reasonable for the framework to do this unconditionally,
so maybe we need to find an alternative if we want to fix this for everyone
by default.

Greg

On Thu, Feb 9, 2023 at 8:26 AM Frank Grimes 
wrote:

>  I'm still having trouble understanding how the configs could match in the
> code you highlighted when we change connector and/or task config values
> when no keys are being pruned by the connector implementations in
> question.Would capturing a new generation value within the config itself on
> every submitted change be a possible fix/workaround?The possible slightly
> negative consequence of that change would be that re-submitting the same
> config which would effectively be a no-op in the current implementation
> would now force task reconfigures/restarts?
> On Wednesday, February 8, 2023, 12:47:19 PM EST, Greg Harris
>  wrote:
> > This is the condition which is causing the issue:
> https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931>
> The DistributedHerder is comparing the generation 1 and generation 2
> configurations, and erroneously believes that they are equal, when in fact
> the underlying secrets have changed.> This prevents the DistributedHerder
> from writing generation 2 task configs to the KafkaConfigBackingStore
> entirely, leaving it in state (A) without realizing it.
>
>


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-09 Thread Frank Grimes
 I'm still having trouble understanding how the configs could match in the code 
you highlighted when we change connector and/or task config values when no keys 
are being pruned by the connector implementations in question.Would capturing a 
new generation value within the config itself on every submitted change be a 
possible fix/workaround?The possible slightly negative consequence of that 
change would be that re-submitting the same config which would effectively be a 
no-op in the current implementation would now force task reconfigures/restarts?
On Wednesday, February 8, 2023, 12:47:19 PM EST, Greg Harris 
 wrote:  
> This is the condition which is causing the issue: 
> https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931>
>  The DistributedHerder is comparing the generation 1 and generation 2 
> configurations, and erroneously believes that they are equal, when in fact 
> the underlying secrets have changed.> This prevents the DistributedHerder 
> from writing generation 2 task configs to the KafkaConfigBackingStore 
> entirely, leaving it in state (A) without realizing it.

  

Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-08 Thread Greg Harris
Frank,

> I'm operating on the assumption that the connectors in question get stuck
in an inconsistent state
> Another thought... if an API exists to list all connectors in such a
state, then at least some monitoring/alerting could be put in place, right?

There is two different inconsistencies relevant to this discussion.
Inconsistent state (A) is when a new connector config (generation 2) has
been written to the config topic, but a full set of old task configs
(generation 1) exist in the config topic.
Inconsistent state (B) is when a new connector config (generation 2) has
been written to the config topic, and there is an incomplete set of new
task configs (generation 2) in the config topic.
Since this is a reconfiguration, it could also be a mix of generation 1 and
generation task 2 configs in the config topic.

Inconsistent state (A) is a normal part of updating a connector config; the
cluster writes the connector config to the config topic durably, and then
begins to try to asynchronously regenerate task configs.
Inconsistent state (B) is not normal, and happens when a worker is unable
to atomically write a full set of task configurations to the config topic.
The inconsistentConnectors method will report connectors in state (B), but
will not report connectors in state (A).
In state (A), you will see tasks running stale configurations. In state (B)
you will see no tasks running, as the framework will prevent starting tasks
which do not have consistent (B) task configs.
As you're not seeing the no-tasks symptom, I would put state (B) out of
mind and assume that the KafkaConfigBackingStore will give you atomic read
and write semantics for a full set of task configs at once.

> I see on KafkaConfigBackingStore.putTaskConfigs that if the JavaDoc is to
be believed, a ConnectException is thrown "if the task configurations do
not resolve inconsistencies found in the existing root and task
configurations."

It looks like ConnectExceptions are thrown after a failure to read or write
to the config topic, which is a pretty broad class of errors.
But if any such error occurs, the code cannot be guaranteed that a full set
of task configs + task commit message was written, and the topic may be in
state (A) or state (B).
This method is called specifically to resolve state (A) or state (B), and
the exception is just indicating that whatever inconsistency was present
before the call may still be there.

> Am I right that there is only one retry on that exception in
DistributedHerder.reconfigureConnectorTasksWithRetry?

No, the implementation of `reconfigureTasksWithRetry` calls itself in its
error callback, and calls reconfigureConnector to retry the operation.
This is a recursive loop, but isn't obvious because it's inside a callback.

This is the condition which is causing the issue:
https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
The DistributedHerder is comparing the generation 1 and generation 2
configurations, and erroneously believes that they are equal, when in fact
the underlying secrets have changed.
This prevents the DistributedHerder from writing generation 2 task configs
to the KafkaConfigBackingStore entirely, leaving it in state (A) without
realizing it.

Thanks for working on this issue,
Greg

On Wed, Feb 8, 2023 at 8:38 AM Frank Grimes 
wrote:

> Another thought... if an API exists to list all connectors in such a
> state, then at least some monitoring/alerting could be put in place, right?


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-08 Thread Frank Grimes
Another thought... if an API exists to list all connectors in such a state, 
then at least some monitoring/alerting could be put in place, right?

Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-08 Thread Frank Grimes
 So I've been looking into the codebase to familiarize myself with it.I'm 
operating on the assumption that the connectors in question get stuck in an 
inconsistent state which causes them to prune the new task configs from those 
which are "broadcast" to the workers.I see on 
KafkaConfigBackingStore.putTaskConfigs that if the JavaDoc is to be believed, a 
ConnectException is thrown "if the task configurations do not resolve 
inconsistencies found in the existing root and task configurations.".Am I right 
that there is only one retry on that exception in 
DistributedHerder.reconfigureConnectorTasksWithRetry?Could more retry attempts 
help avoid the bug?Or once it's inconsistent in the config topic there's no 
chance it'll auto-resolve without a resizing?
Thanks!

CVE-2023-25194: Apache Kafka: Possible RCE/Denial of service attack via SASL JAAS JndiLoginModule configuration using Kafka Connect

2023-02-07 Thread Manikumar
Severity: important

Description:

A possible security vulnerability has been identified in Apache Kafka
Connect. This requires access to a Kafka Connect worker,
and the ability to create/modify connectors on it with an arbitrary
Kafka client SASL JAAS config and a SASL-based security protocol,
which has been possible on Kafka Connect clusters since Apache Kafka
2.3.0. When configuring the connector via the Kafka Connect REST API,
an authenticated operator can set the `sasl.jaas.config` property for any
of the connector's Kafka clients to
"com.sun.security.auth.module.JndiLoginModule",
which can be done via the `producer.override.sasl.jaas.config`,
`consumer.override.sasl.jaas.config`, or
`admin.override.sasl.jaas.config` properties.

This will allow the server to connect to the attacker's LDAP server
and deserialize the LDAP response, which the attacker can use to
execute java deserialization gadget chains on the Kafka connect
server. Attackers can cause unrestricted deserialization of untrusted
data (or) RCE vulnerability when there are gadgets in the classpath.

Since Apache Kafka 3.0.0, users are allowed to specify these properties
in connector configurations for Kafka Connect clusters running with
out-of-the-box configurations. Before Apache Kafka 3.0.0, users may not
specify these properties unless the Kafka Connect cluster has been reconfigured
with a connector client override policy that permits them.

Since Apache Kafka 3.4.0, we have added a system property
("-Dorg.apache.kafka.disallowed.login.modules") to disable the
problematic login modules usage in SASL JAAS configuration. Also by
default "com.sun.security.auth.module.JndiLoginModule" is disabled
in Apache Kafka 3.4.0.

We advise the Kafka Connect users to validate connector configurations
and only allow trusted JNDI configurations. Also examine connector
dependencies for vulnerable versions and either upgrade their
connectors, upgrading that specific dependency, or removing the
connectors as options for remediation. Finally, in addition to leveraging the
"org.apache.kafka.disallowed.login.modules" system property, Kafka Connect users
can also implement their own connector client config override policy, which can
be used to control which Kafka client properties can be overridden directly
in a connector config and which cannot.

Credit:

Apache Kafka would like to thank Jari Jääskelä
(https://hackerone.com/reports/1529790)
and 4ra1n and Y4tacker (they found vulnerabilities in other Apache projects.
After discussion between PMC of the two projects, it was finally
confirmed that it was the vulnerability of Kafka then they reported it to us)


References:

https://kafka.apache.org/cve-list
https://kafka.apache.org/
https://www.cve.org/CVERecord?id=CVE-2023-25194


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-06 Thread Greg Harris
Frank,

I don't think that the fix needs to necessarily follow the #12450 PR, we
can choose to start from scratch now that we know more about the issue.
If that PR is useful as a starting point, we can also include it, that is
up to you.

Greg

On Mon, Feb 6, 2023 at 10:21 AM Frank Grimes
 wrote:

>
> Hi Greg,
> I actually just found the following comment on this PR for
> https://issues.apache.org/jira/browse/KAFKA-13809:
> https://github.com/apache/kafka/pull/12450
> > we get the same behavior (KAFKA-9228 notwithstanding) by passing the
> original properties through to tasks transparently
> It seems we're not the first to notice that the issue isn't limited to
> connectors who selectively propagate properties to the task configs.FWIW,
> the kafka-connect-s3 connector also does not seem to prune any configs from
> the tasks:
> https://github.com/confluentinc/kafka-connect-storage-cloud/blob/d21662e78286d79b938e7b9affa418b863a6299f/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnector.java#L57-L77
> Is the patch provided on https://github.com/apache/kafka/pull/7823 still
> the right approach?
> Thanks!
>
>
>
>


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-06 Thread Frank Grimes
 
Hi Greg,
I actually just found the following comment on this PR for 
https://issues.apache.org/jira/browse/KAFKA-13809: 
https://github.com/apache/kafka/pull/12450
> we get the same behavior (KAFKA-9228 notwithstanding) by passing the original 
>properties through to tasks transparently
It seems we're not the first to notice that the issue isn't limited to 
connectors who selectively propagate properties to the task configs.FWIW, the 
kafka-connect-s3 connector also does not seem to prune any configs from the 
tasks: 
https://github.com/confluentinc/kafka-connect-storage-cloud/blob/d21662e78286d79b938e7b9affa418b863a6299f/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnector.java#L57-L77
Is the patch provided on https://github.com/apache/kafka/pull/7823 still the 
right approach?
Thanks!





Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-06 Thread Greg Harris
Frank,

I think you're right that the KAFKA-9228 ticket doesn't capture every
possible reconfiguration that might result in a dropped restart.
The ticket calls out the FileStream connectors, which generate their
configurations by dropping unknown config values, which is relatively
uncommon.
This means that even changes to non-externalized configurations may not
trigger a restart.

We now know that dropped restarts can happen to non-FileStream connectors
with externalized config values, but a fix for one should also fix the
other.
If you're interested in contributing a fix, we would welcome the
contribution. Otherwise, I'll look into this and see what we can do about
it.
Please keep in mind the known workarounds for this bug that can improve the
behavior before a fix lands.

Thanks!
Greg

On Mon, Feb 6, 2023 at 8:50 AM Frank Grimes 
wrote:

>  Hi Greg,
> The "long-term inconsistency" we have observed is not with no tasks at
> all, but instead with all the previously running tasks remaining in a
> running state but with a previous config.
> If I'm understanding the original bug report correctly, the scope of the
> problem was thought to only affect the following built-in connectors:
> FileStreamSourceConnector and the FileStreamSinkConnector
> see
> https://issues.apache.org/jira/browse/KAFKA-9228?focusedCommentId=16993990=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16993990
> However, we are seeing this issue with a number of 3rd-party connectors
> not provided as part of the Kafka project as well.e.g.- Confluent's
> kafka-connect-s3 connector (
> https://github.com/confluentinc/kafka-connect-storage-cloud)- Aerospike's
> connector: (
> https://docs.aerospike.com/connect/kafka/to-asdb/from-kafka-to-asdb-overview
> )
> We're wondering if it would be possible to re-evaluate the impact of this
> bug and look at addressing it either with the pre-existing PR (
> https://github.com/apache/kafka/pull/7823) or a new one.
> Thanks!On Friday, February 3, 2023, 04:29:38 PM EST, Greg Harris
>  wrote:
>
>  Frank,
>
> I realized I didn't respond to the title directly, sorry about that.
> The reason that `ClusterConfigState::inconsistentConnectors` is not used,
> is that the effect of an inconsistent connector is applied via
> `ClusterConfigState::tasks`.
> If a connector is inconsistent, then the tasks method will not return any
> task configurations.
> This will cause the outer logic to believe that there are 0 tasks defined,
> and so any connector which does request a task reconfiguration will write
> any task configs that are generated by the connector.
>
> And a task reconfiguration occurs on each connector start, and each time a
> connector requests a reconfiguration.
> If a reconfiguration failed (which is how the connector became
> inconsistent) then it will be retried.
> If the worker that had the reconfiguration fail then leaves the cluster,
> then the rebalance algorithm will start the connector somewhere else, which
> will trigger another task reconfiguration.
>
> Given the above, there does not appear to be any way to have long-term
> inconsistent connectors without a reconfiguration consistently failing.
> If you are seeing the symptoms of long-term inconsistency (no tasks at all
> for a connector) then I'd be very interested in a reproduction case for
> that.
>
> Thanks!
> Greg Harris
>
> On Fri, Feb 3, 2023 at 1:05 PM Greg Harris  wrote:
>
> > Frank,
> >
> > The inconsistentConnectors method is related to an extremely specific
> > inconsistency that can happen when a worker writes some task
> > configurations, and then disconnects without writing a following "commit
> > tasks record" to the config topic.
> > This is a hold-over from the early days of connect from before Kafka's
> > transactions support, and is mostly an implementation detail.
> > See the `KafkaConfigBackingStore::putTaskConfigs` and
> > `KafkaConfigBackingStore::processTasksCommitRecord` for the relevant
> code.
> > It is not expected that this method is in regular use, and is primarily
> > for diagnostic purposes.
> >
> > What the Strimzi issue seems to describe (and probably the issue you are
> > facing) occurs at a higher level, when a worker is deciding whether to
> > write new task configs at all.
> > The relevant code is here:
> >
> https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
> > In that snippet, new task configs generated by the connector are only
> > written to the config topic if they differ from the current contents of
> the
> > config topic. And 

Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-06 Thread Frank Grimes
 Hi Greg,
The "long-term inconsistency" we have observed is not with no tasks at all, but 
instead with all the previously running tasks remaining in a running state but 
with a previous config.
If I'm understanding the original bug report correctly, the scope of the 
problem was thought to only affect the following built-in connectors: 
FileStreamSourceConnector and the FileStreamSinkConnector
see 
https://issues.apache.org/jira/browse/KAFKA-9228?focusedCommentId=16993990=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16993990
However, we are seeing this issue with a number of 3rd-party connectors not 
provided as part of the Kafka project as well.e.g.- Confluent's 
kafka-connect-s3 connector 
(https://github.com/confluentinc/kafka-connect-storage-cloud)- Aerospike's 
connector: 
(https://docs.aerospike.com/connect/kafka/to-asdb/from-kafka-to-asdb-overview)
We're wondering if it would be possible to re-evaluate the impact of this bug 
and look at addressing it either with the pre-existing PR 
(https://github.com/apache/kafka/pull/7823) or a new one.
Thanks!On Friday, February 3, 2023, 04:29:38 PM EST, Greg Harris 
 wrote:  
 
 Frank,

I realized I didn't respond to the title directly, sorry about that.
The reason that `ClusterConfigState::inconsistentConnectors` is not used,
is that the effect of an inconsistent connector is applied via
`ClusterConfigState::tasks`.
If a connector is inconsistent, then the tasks method will not return any
task configurations.
This will cause the outer logic to believe that there are 0 tasks defined,
and so any connector which does request a task reconfiguration will write
any task configs that are generated by the connector.

And a task reconfiguration occurs on each connector start, and each time a
connector requests a reconfiguration.
If a reconfiguration failed (which is how the connector became
inconsistent) then it will be retried.
If the worker that had the reconfiguration fail then leaves the cluster,
then the rebalance algorithm will start the connector somewhere else, which
will trigger another task reconfiguration.

Given the above, there does not appear to be any way to have long-term
inconsistent connectors without a reconfiguration consistently failing.
If you are seeing the symptoms of long-term inconsistency (no tasks at all
for a connector) then I'd be very interested in a reproduction case for
that.

Thanks!
Greg Harris

On Fri, Feb 3, 2023 at 1:05 PM Greg Harris  wrote:

> Frank,
>
> The inconsistentConnectors method is related to an extremely specific
> inconsistency that can happen when a worker writes some task
> configurations, and then disconnects without writing a following "commit
> tasks record" to the config topic.
> This is a hold-over from the early days of connect from before Kafka's
> transactions support, and is mostly an implementation detail.
> See the `KafkaConfigBackingStore::putTaskConfigs` and
> `KafkaConfigBackingStore::processTasksCommitRecord` for the relevant code.
> It is not expected that this method is in regular use, and is primarily
> for diagnostic purposes.
>
> What the Strimzi issue seems to describe (and probably the issue you are
> facing) occurs at a higher level, when a worker is deciding whether to
> write new task configs at all.
> The relevant code is here:
> https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
> In that snippet, new task configs generated by the connector are only
> written to the config topic if they differ from the current contents of the
> config topic. And this comparison is done on the post-transformation
> configurations, after ConfigProviders have been resolved.
> And critical for this bug, that resolution is done twice in quick
> succession, when the old and new configuration could evaluate to the same
> final result.
> The code snippet also shows why your workaround works: the other condition
> for writing all of the task configs to the config topic is that the number
> of configurations has changed.
>
> I believe this bug is captured in
> https://issues.apache.org/jira/browse/KAFKA-9228 but it has not
> progressed in some time.
> There is a potentially lower-impact workaround that involves adding a
> nonce to your connector configuration that changes each time you apply a
> new configuration to the connector, which most connectors will pass
> directly to their tasks.
> But this unfortunately does not work in general, as connectors could
> exclude the nonce when generating task configurations.
>
> I hope this gives some more insight to the behavior you're seeing.
>
> Thanks,
> Greg Harris
>
> On Fri, Feb 3, 2023 at 7:36 AM Frank Grimes
>  wrote:
>
>> Hi, we're

Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-03 Thread Greg Harris
Frank,

I realized I didn't respond to the title directly, sorry about that.
The reason that `ClusterConfigState::inconsistentConnectors` is not used,
is that the effect of an inconsistent connector is applied via
`ClusterConfigState::tasks`.
If a connector is inconsistent, then the tasks method will not return any
task configurations.
This will cause the outer logic to believe that there are 0 tasks defined,
and so any connector which does request a task reconfiguration will write
any task configs that are generated by the connector.

And a task reconfiguration occurs on each connector start, and each time a
connector requests a reconfiguration.
If a reconfiguration failed (which is how the connector became
inconsistent) then it will be retried.
If the worker that had the reconfiguration fail then leaves the cluster,
then the rebalance algorithm will start the connector somewhere else, which
will trigger another task reconfiguration.

Given the above, there does not appear to be any way to have long-term
inconsistent connectors without a reconfiguration consistently failing.
If you are seeing the symptoms of long-term inconsistency (no tasks at all
for a connector) then I'd be very interested in a reproduction case for
that.

Thanks!
Greg Harris

On Fri, Feb 3, 2023 at 1:05 PM Greg Harris  wrote:

> Frank,
>
> The inconsistentConnectors method is related to an extremely specific
> inconsistency that can happen when a worker writes some task
> configurations, and then disconnects without writing a following "commit
> tasks record" to the config topic.
> This is a hold-over from the early days of connect from before Kafka's
> transactions support, and is mostly an implementation detail.
> See the `KafkaConfigBackingStore::putTaskConfigs` and
> `KafkaConfigBackingStore::processTasksCommitRecord` for the relevant code.
> It is not expected that this method is in regular use, and is primarily
> for diagnostic purposes.
>
> What the Strimzi issue seems to describe (and probably the issue you are
> facing) occurs at a higher level, when a worker is deciding whether to
> write new task configs at all.
> The relevant code is here:
> https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
> In that snippet, new task configs generated by the connector are only
> written to the config topic if they differ from the current contents of the
> config topic. And this comparison is done on the post-transformation
> configurations, after ConfigProviders have been resolved.
> And critical for this bug, that resolution is done twice in quick
> succession, when the old and new configuration could evaluate to the same
> final result.
> The code snippet also shows why your workaround works: the other condition
> for writing all of the task configs to the config topic is that the number
> of configurations has changed.
>
> I believe this bug is captured in
> https://issues.apache.org/jira/browse/KAFKA-9228 but it has not
> progressed in some time.
> There is a potentially lower-impact workaround that involves adding a
> nonce to your connector configuration that changes each time you apply a
> new configuration to the connector, which most connectors will pass
> directly to their tasks.
> But this unfortunately does not work in general, as connectors could
> exclude the nonce when generating task configurations.
>
> I hope this gives some more insight to the behavior you're seeing.
>
> Thanks,
> Greg Harris
>
> On Fri, Feb 3, 2023 at 7:36 AM Frank Grimes
>  wrote:
>
>> Hi, we're investigating an issue where occasionally config changes don't
>> propagate to connectors/tasks.
>>
>> When this occurs, the only way to ensure that the configuration takes
>> effect is to resize the number of tasks back down to 1 and then resize back
>> up to the original number of tasks.
>> In searching for others who have been bitten by this scenario we found
>> the following thread on the Strimzi discussions pages:
>> https://github.com/strimzi/strimzi-kafka-operator/discussions/7738
>> Both the symptoms and workaround described there match what we've
>> seen.We've been doing some digging into the Kafka Connect codebase to
>> better understand how config.storage.topic is consumed.
>> In the interest of brevity I won't repeat that entire thread of
>> discussion here.
>> However, I was wondering if anyone knows whether the JavaDoc suggestion
>> on ClusterConfigState.inconsistentConnectors() is actually implemented in
>> the clustered Worker code.i.e. "When a worker detects a connector in this
>> state, it should request that the connector regenerate its task
>> configurations."
>> The reason I ask is because I couldn't find any references to that API
>> call anywhere but in the KafkaConfigBackingStoreTest unit test cases.
>> Thanks!
>>
>


Re: Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-03 Thread Greg Harris
Frank,

The inconsistentConnectors method is related to an extremely specific
inconsistency that can happen when a worker writes some task
configurations, and then disconnects without writing a following "commit
tasks record" to the config topic.
This is a hold-over from the early days of connect from before Kafka's
transactions support, and is mostly an implementation detail.
See the `KafkaConfigBackingStore::putTaskConfigs` and
`KafkaConfigBackingStore::processTasksCommitRecord` for the relevant code.
It is not expected that this method is in regular use, and is primarily for
diagnostic purposes.

What the Strimzi issue seems to describe (and probably the issue you are
facing) occurs at a higher level, when a worker is deciding whether to
write new task configs at all.
The relevant code is here:
https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
In that snippet, new task configs generated by the connector are only
written to the config topic if they differ from the current contents of the
config topic. And this comparison is done on the post-transformation
configurations, after ConfigProviders have been resolved.
And critical for this bug, that resolution is done twice in quick
succession, when the old and new configuration could evaluate to the same
final result.
The code snippet also shows why your workaround works: the other condition
for writing all of the task configs to the config topic is that the number
of configurations has changed.

I believe this bug is captured in
https://issues.apache.org/jira/browse/KAFKA-9228 but it has not
progressed in some time.
There is a potentially lower-impact workaround that involves adding a nonce
to your connector configuration that changes each time you apply a new
configuration to the connector, which most connectors will pass directly to
their tasks.
But this unfortunately does not work in general, as connectors could
exclude the nonce when generating task configurations.

I hope this gives some more insight to the behavior you're seeing.

Thanks,
Greg Harris

On Fri, Feb 3, 2023 at 7:36 AM Frank Grimes 
wrote:

> Hi, we're investigating an issue where occasionally config changes don't
> propagate to connectors/tasks.
>
> When this occurs, the only way to ensure that the configuration takes
> effect is to resize the number of tasks back down to 1 and then resize back
> up to the original number of tasks.
> In searching for others who have been bitten by this scenario we found the
> following thread on the Strimzi discussions pages:
> https://github.com/strimzi/strimzi-kafka-operator/discussions/7738
> Both the symptoms and workaround described there match what we've
> seen.We've been doing some digging into the Kafka Connect codebase to
> better understand how config.storage.topic is consumed.
> In the interest of brevity I won't repeat that entire thread of discussion
> here.
> However, I was wondering if anyone knows whether the JavaDoc suggestion on
> ClusterConfigState.inconsistentConnectors() is actually implemented in the
> clustered Worker code.i.e. "When a worker detects a connector in this
> state, it should request that the connector regenerate its task
> configurations."
> The reason I ask is because I couldn't find any references to that API
> call anywhere but in the KafkaConfigBackingStoreTest unit test cases.
> Thanks!
>


Kafka Connect ClusterConfigState.inconsistentConnectors() not handled by distributed Worker?

2023-02-03 Thread Frank Grimes
Hi, we're investigating an issue where occasionally config changes don't 
propagate to connectors/tasks.

When this occurs, the only way to ensure that the configuration takes effect is 
to resize the number of tasks back down to 1 and then resize back up to the 
original number of tasks.
In searching for others who have been bitten by this scenario we found the 
following thread on the Strimzi discussions pages: 
https://github.com/strimzi/strimzi-kafka-operator/discussions/7738
Both the symptoms and workaround described there match what we've seen.We've 
been doing some digging into the Kafka Connect codebase to better understand 
how config.storage.topic is consumed.
In the interest of brevity I won't repeat that entire thread of discussion here.
However, I was wondering if anyone knows whether the JavaDoc suggestion on 
ClusterConfigState.inconsistentConnectors() is actually implemented in the 
clustered Worker code.i.e. "When a worker detects a connector in this state, it 
should request that the connector regenerate its task configurations."
The reason I ask is because I couldn't find any references to that API call 
anywhere but in the KafkaConfigBackingStoreTest unit test cases.
Thanks!


Kafka connect SMT does not work for primary key columns

2022-12-11 Thread Fuxin Hao
tions]
2022-11-25 06:57:01,510 WARN   ||  Write of 2 records failed,
remainingRetries=0   [io.confluent.connect.jdbc.sink.JdbcSinkTask]
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO
"pk_created_at" ("created_at") VALUES (1669359291990398) ON CONFLICT
("created_at") DO NOTHING was aborted: ERROR: column "created_at" is
of type timestamp without time zone but expression is of type bigint
  Hint: You will need to rewrite or cast the expression.
  Position: 52  Call getNextException to see other errors in the batch.
at 
org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:871)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:910)
at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1638)
at 
io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196)
at 
io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.postgresql.util.PSQLException: ERROR: column
"created_at" is of type timestamp without time zone but expression is
of type bigint
  Hint: You will need to rewrite or cast the expression.
  Position: 52
at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:315)
at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:868)
... 17 more

The error seems like the sink connector was still trying to insert
created_at with a numeric 1669359291990398. but I verified that the
messages in the kafka topic have been transformed into strings. It worked
if created_at is not a primary key.

I just don't know why SMT does not work for primary key columns. How can I
fix it? Could someone help? much appreciated.

my SMT:
https://github.com/FX-HAO/kafka-connect-debezium-tranforms/blob/master/src/main/java/com/github/haofuxin/kafka/connect/DebeziumTimestampConverter.java

my sink configuration:

{
"name": "test-sinker",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "test.public.pk_created_at",
"table.name.format": "${topic}",
"connection.url":
"jdbc:postgresql://target:5432/test?stringtype=unspecified=postgres=postgres",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enabled": true,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enabled": true,
"transforms": "dropPrefix",
"transforms.dropPrefix.type":
"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.dropPrefix.replacement": "$3",
"auto.create": "false",
"insert.mode": "upsert",
"pk.mode": "record_key",
"delete.enabled": true
}
}


Active <> Active MirrorMaker2 setup via dedicated Kafka Connect cluster

2022-11-23 Thread Sriram Ganesh
Hi,

I am trying to set up active <> active mm2 via Kafka connect distributed
cluster. It seems not possible because of the limitations like
*bootstrap.servers *property.
And also as per this KIP
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-RunningMirrorMakerinaConnectcluster,
it
is not possible to have sink connector. It means only one way is possible.

I have already tried other ways. Wanted to try this out using Kafka connect
distributed cluster setup.

Please kindly help me if I am doing anything wrong here and also throw some
light on how to set up active <> active mm2 via Kafka connect cluster if it
is possible. I really appreciate any help you can provide.


-- 
*Sriram G*
*Tech*


Re: Entire Kafka Connect cluster stuck because of a stuck sink connector

2022-10-12 Thread Chris Egerton
Hi,

What version of Kafka Connect are you running? This sounds like a bug that
was fixed a few releases ago.

Cheers,

Chris

On Wed, Oct 12, 2022, 21:27 Hemanth Savasere 
wrote:

> We have stumbled upon an issue on a running cluster with multiple
> source/sink connectors:
>
>1. One of our connectors was a JDBC sink connector connected to an SQL
>Server database (using the oracle JDBC driver).
>2. It turns out that the DB instance had a problem causing all queries
>to be stuck forever, which in turn made the start method of the
> connector
>hang forever.
>3. After some time, the entire Kafka Connect cluster was unavailable and
>the REST API was not responding giving
> {"error_code":500,"message":"Request
>timed out"} for most requests.
>4. Pausing (just before the deletion of the consumer group) or deleting
>the problematic connector allowed the cluster to run normally again.
>
> We could reproduce the same issue by adding Thread.sleep(30) in the
> start method or in the put method of the ConnectorTask.
>
> Wanted to know if there's any wiki/documentation provided that mentions how
> to handle this issue. My approach would be to throw a timeout after waiting
> for a particular time period and make the connector fail fast.
>
> --
> Thanks & Regards,
> Hemanth
>


  1   2   3   4   5   6   7   >