[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ning Zhang updated KAFKA-10370: ------------------------------- Description: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*. was: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe* > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > ---------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Affects Versions: 2.5.0 > Reporter: Ning Zhang > Assignee: Ning Zhang > Priority: Major > Fix For: 2.6.0 > > > In WorkerSinkTask.java, when we want the consumer to start consuming from > certain offsets, rather than from the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] > is used to carry the offsets from external world (e.g. implementation of > SinkTask). > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, > (2) consumer.seek(tp, offset) to rewind the consumer. > when running (2), we saw the following IllegalStateException: > {code:java} > java.lang.IllegalStateException: No current assignment for partition mytopic-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution that has been initially verified is to use *consumer.assign* > with *consumer.seek* , instead of *consumer.subscribe*. -- This message was sent by Atlassian Jira (v8.3.4#803005)