Hey Burton,

Thanks for your question and bug report.

The exception you included does not indicate that your connectors are
overloaded. The primary way of diagnosing an overloaded connector is the
consumer lag metric, and if you're seeing acceptable lag, that should
indicate that your connectors are capable of handling the load.
I would say that your workload is _unhealthy_ though, because it should be
able to operate without throwing this particular exception. I found one
previous report of this exception [1] but no action was taken. Instead, the
recommendation was to change the task implementation to perform offset
loading only during open().

It looks like this depends on the task implementation: If the consumer
rebalances and the task loses its assignment, and the task later calls
SinkTaskContext#offset() with the now-revoked partition, it would cause
this exception.
I'm not familiar with the Snowflake task, but upon a cursory inspection, it
looks like it buffers records [2] across poll() calls, and may call
SinkTaskContext#offset() in a later poll [3]. They appear to have a close()
method that could be used to prevent SinkTaskContext#offset() from being
called, but I'm not sure why it isn't effective.
You should contact the Snowflake Connector maintainers and know that they
are exposed to this exception.

I'll re-open this issue on the framework side to see if we can find a
solution to fix this for other connectors.

Thanks,
Greg

[1] https://issues.apache.org/jira/browse/KAFKA-10370
[2]
https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L380-L383
[3]
https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L908
[4]
https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java#L442

On Wed, Jul 3, 2024 at 12:25 PM Burton Williams <burton.b.willi...@gmail.com>
wrote:

> Hi,
>
> I have 100+ sink connectors running with 100+ topics each with roughly 3
> partitions per topic. There are running on K8s on 10 pods with 6 cpus and
> 32Gig mem. The connector in question is Snowflake's sink connector v2.2.0.
> This worked in the mini batch mode SNOWPIPE, but once i switched over to
> SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the
> exception:
>
> State:      FAILED
>
> Worker ID:  10.136.83.73:8080
>
> Trace:      java.lang.IllegalStateException: No current assignment for
> partition bigpicture.bulk_change-0
>     at
>
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
>
>     at
>
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386)
>     at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637)
>
>     at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642)
>
>     at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327)
>
>     at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)
>
>     at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)
>
>     at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
>
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
>
>     at
>
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
>
>     at
>
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
>     at
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
>     at
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>
>     at java.base/java.lang.Thread.run(Thread.java:829)
>
>
>
> My questions are:
>
>    1. Are these connectors are overloaded? Can kafka connect handle this
>    level of load?
>    2. say it can, which i've seen it do, could it be that this is caused by
>    underlying rebalancing? If so what would you recommend I do to mitigate?
>
>
> Thanks
>
> -BW
>

Reply via email to