Hey Burton, Thanks for your question and bug report.
The exception you included does not indicate that your connectors are overloaded. The primary way of diagnosing an overloaded connector is the consumer lag metric, and if you're seeing acceptable lag, that should indicate that your connectors are capable of handling the load. I would say that your workload is _unhealthy_ though, because it should be able to operate without throwing this particular exception. I found one previous report of this exception [1] but no action was taken. Instead, the recommendation was to change the task implementation to perform offset loading only during open(). It looks like this depends on the task implementation: If the consumer rebalances and the task loses its assignment, and the task later calls SinkTaskContext#offset() with the now-revoked partition, it would cause this exception. I'm not familiar with the Snowflake task, but upon a cursory inspection, it looks like it buffers records [2] across poll() calls, and may call SinkTaskContext#offset() in a later poll [3]. They appear to have a close() method that could be used to prevent SinkTaskContext#offset() from being called, but I'm not sure why it isn't effective. You should contact the Snowflake Connector maintainers and know that they are exposed to this exception. I'll re-open this issue on the framework side to see if we can find a solution to fix this for other connectors. Thanks, Greg [1] https://issues.apache.org/jira/browse/KAFKA-10370 [2] https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L380-L383 [3] https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L908 [4] https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java#L442 On Wed, Jul 3, 2024 at 12:25 PM Burton Williams <[email protected]> wrote: > Hi, > > I have 100+ sink connectors running with 100+ topics each with roughly 3 > partitions per topic. There are running on K8s on 10 pods with 6 cpus and > 32Gig mem. The connector in question is Snowflake's sink connector v2.2.0. > This worked in the mini batch mode SNOWPIPE, but once i switched over to > SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the > exception: > > State: FAILED > > Worker ID: 10.136.83.73:8080 > > Trace: java.lang.IllegalStateException: No current assignment for > partition bigpicture.bulk_change-0 > at > > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) > > at > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238) > > at > > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207) > > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229) > > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284) > > at > > org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) > > at > > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > > at > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > at > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > at java.base/java.lang.Thread.run(Thread.java:829) > > > > My questions are: > > 1. Are these connectors are overloaded? Can kafka connect handle this > level of load? > 2. say it can, which i've seen it do, could it be that this is caused by > underlying rebalancing? If so what would you recommend I do to mitigate? > > > Thanks > > -BW >
