[ 
https://issues.apache.org/jira/browse/KAFKA-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473239#comment-16473239
 ] 

ASF GitHub Bot commented on KAFKA-6608:
---------------------------------------

ConcurrencyPractitioner closed pull request #4643: [KAFKA-6608] Add timeout 
parameter to methods which fetches and reset…
URL: https://github.com/apache/kafka/pull/4643
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index e090f527414..87ffc97bf36 100644
--- a/build.gradle
+++ b/build.gradle
@@ -144,8 +144,8 @@ subprojects {
   if (!JavaVersion.current().isJava9Compatible())
     apply plugin: 'findbugs'
 
-  sourceCompatibility = 1.7
-  targetCompatibility = 1.7
+  sourceCompatibility = 1.8
+  targetCompatibility = 1.8
 
   compileJava {
     options.encoding = 'UTF-8'
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 0e27e1f0383..a26af5f5247 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.common.TopicPartition;
 
 import java.io.Closeable;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -90,6 +91,11 @@
      */
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
 
+    /**
+     * @see KafkaConsumer#commitSync(Map, long, TimeUnit)
+     */
+    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets, 
+                           final Duration duration);
     /**
      * @see KafkaConsumer#commitAsync()
      */
@@ -124,12 +130,22 @@
      * @see KafkaConsumer#position(TopicPartition)
      */
     public long position(TopicPartition partition);
+    
+    /**
+     * @see KafkaConsumer#position(TopicPartition, Long, TimeUnit)
+     */
+    public long position(TopicPartition partition, final Duration duration);
 
     /**
      * @see KafkaConsumer#committed(TopicPartition)
      */
     public OffsetAndMetadata committed(TopicPartition partition);
 
+    /**
+     * @see KafkaConsumer#committed(TopicPartition, long, TimeUnit)
+     */
+    public OffsetAndMetadata committed(TopicPartition partition, final 
Duration duration);
+
     /**
      * @see KafkaConsumer#metrics()
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 81137f3c8dd..aa57b6afc1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -37,6 +37,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -55,6 +56,7 @@
 import org.slf4j.Logger;
 
 import java.net.InetSocketAddress;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
@@ -827,6 +829,10 @@ private KafkaConsumer(ConsumerConfig config,
         this.assignors = assignors;
     }
 
+    public long requestTimeoutMs() {
+        return requestTimeoutMs;
+    }
+
     /**
      * Get the set of partitions currently assigned to this consumer. If 
subscription happened by directly assigning
      * partitions using {@link #assign(Collection)} then this will simply 
return the same partitions that
@@ -1259,6 +1265,50 @@ public void commitSync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
         }
     }
 
+    /**
+    * Commit the specified offsets for the specified list of topics and 
partitions.
+    * <p>
+    * This commits offsets to Kafka. The offsets committed using this API will 
be used on the first fetch after every
+    * rebalance and also on startup. As such, if you need to store offsets in 
anything other than Kafka, this API
+    * should not be used. The committed offset should be the next message your 
application will consume,
+    * i.e. lastProcessedMessageOffset + 1.
+    * <p>
+    * This is a synchronous commits and will block until either the commit 
succeeds or an unrecoverable error is
+    * encountered (in which case it is thrown to the caller).
+    * <p>
+    * Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
+    * (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
+    *
+    * @param offsets A map of offsets by partition with associated metadata
+    * @param duration The amount of time the user would like to block
+    * @throws org.apache.kafka.clients.consumer.CommitFailedException if the 
commit failed and cannot be retried.
+    *             This can only occur if you are using automatic group 
management with {@link #subscribe(Collection)},
+    *             or if there is an active group with the same groupId which 
is using group management.
+    * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+    *             function is called
+    * @throws org.apache.kafka.common.errors.InterruptException if the calling 
thread is interrupted before or while
+    *             this function is called
+    * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+    * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+    *             configured groupId. See the exception for more details
+    * @throws java.lang.IllegalArgumentException if the committed offset is 
negative
+    * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
+    *             is too large or if the topic does not exist).
+    * @throws TimeoutException if method duration exceeds maximum given time
+    */
+    @Override
+    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets, final Duration duration) {
+        acquireAndEnsureOpen();
+        final long totalWaitTime = duration.toMillis();
+        try {
+            if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), 
totalWaitTime)) {
+                throw new TimeoutException("Commiting offsets synchronously 
took too long.");
+            }
+        } finally {
+            release();
+        }
+    }
+
     /**
      * Commit offsets returned on the last {@link #poll(long) poll()} for all 
the subscribed list of topics and partition.
      * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
@@ -1427,7 +1477,6 @@ public long position(TopicPartition partition) {
             while (offset == null) {
                 // batch update fetch positions for any partitions without a 
valid position
                 updateFetchPositions();
-                client.poll(retryBackoffMs);
                 offset = this.subscriptions.position(partition);
             }
             return offset;
@@ -1436,6 +1485,60 @@ public long position(TopicPartition partition) {
         }
     }
 
+    /**
+     * Get the offset of the <i>next record</i> that will be fetched (if a 
record with that offset exists).
+     * This method may issue a remote call to the server if there is no 
current position for the given partition.
+     * <p>
+     * This call will block until either the position could be determined or 
an unrecoverable error is
+     * encountered (in which case it is thrown to the caller). However, if 
offset position is not retrieved
+     * within a given amount of time, the process will be terminated.
+     *
+     * @param partition The partition to get the position for
+     * @param duration  The maximum duration in which the method can block
+     * @return The current position of the consumer (that is, the offset of 
the next record to be fetched)
+     * @throws IllegalArgumentException if the provided TopicPartition is not 
assigned to this consumer
+     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no 
offset is currently defined for
+     *             the partition
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.TimeoutException if the method 
blocks for longer than requestTimoutMs
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     */
+    public long position(TopicPartition partition, final Duration duration) {
+        final long timeout = duration.toMillis();
+        acquireAndEnsureOpen();
+        try {
+            if (!this.subscriptions.isAssigned(partition))
+                throw new IllegalArgumentException("You can only check the 
position for partitions assigned to this consumer.");
+            Long offset = this.subscriptions.position(partition);
+            final long startMs = time.milliseconds();
+            long finishMs = time.milliseconds();
+            while (offset == null && finishMs - startMs < timeout) {
+                // batch update fetch positions for any partitions without a 
valid position
+                updateFetchPositions(time.milliseconds(), timeout - 
(time.milliseconds() - startMs));
+                finishMs = time.milliseconds();
+                final long remainingTime = Math.max(0, timeout - (finishMs - 
startMs));
+                
+                if (remainingTime > 0) {
+                    client.poll(remainingTime);
+                    offset = this.subscriptions.position(partition);
+                    finishMs = time.milliseconds();
+                } else {
+                    break;
+                }
+            }
+            if (offset == null) throw new TimeoutException("request timed out, 
position is unable to be acquired.");
+            return offset;
+        } finally {
+            release();
+        }
+    }
+
     /**
      * Get the last committed offset for the given partition (whether the 
commit happened by this process or
      * another). This offset will be used as the position for the consumer in 
the event of a failure.
@@ -1464,6 +1567,39 @@ public OffsetAndMetadata committed(TopicPartition 
partition) {
         }
     }
 
+    /**
+     * Get the last committed offset for the given partition (whether the 
commit happened by this process or
+     * another). This offset will be used as the position for the consumer in 
the event of a failure.
+     * <p>
+     * This call will block to do a remote call to get the latest committed 
offsets from the server.
+     *
+     * @param partition The partition to check
+     * @param duration  The duration we would want to block
+     * @return The last committed offset and metadata or null if there was no 
prior commit
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     */
+    @Override
+    public OffsetAndMetadata committed(TopicPartition partition, final 
Duration duration) {
+        acquireAndEnsureOpen();
+        final long totalWaitTime = duration.toMillis();
+        try {
+            Map<TopicPartition, OffsetAndMetadata> offsets = 
coordinator.fetchCommittedOffsets(Collections.singleton(partition), 
+                                                                               
                time.milliseconds(), 
+                                                                               
                totalWaitTime, 
+                                                                               
                time);
+            return offsets.get(partition);
+        } finally {
+            release();
+        }
+    }
+
     /**
      * Get the metrics kept by the consumer
      */
@@ -1799,6 +1935,29 @@ private boolean updateFetchPositions() {
         return false;
     }
 
+    /**
+     * Set the fetch position to the committed position (if there is one) 
+     * or reset it using the offset reset policy the user has configured
+     * within a given time limit.
+     * 
+     * @param start        the time at which the operation begins
+     * @param timeoutMs    the maximum duration of the method
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *             defined
+     * @return true if all assigned positions have a position
+     */
+    private boolean updateFetchPositions(long start, long timeoutMs) {
+        if (subscriptions.hasAllFetchPositions())
+            return true;
+
+        coordinator.refreshCommittedOffsetsIfNeeded(start, timeoutMs);
+        subscriptions.resetMissingPositions();
+        fetcher.resetOffsetsIfNeeded();
+
+        return false;
+    }
+
     /**
      * Acquire the light lock and ensure that the consumer hasn't been closed.
      * @throws IllegalStateException If the consumer has been closed
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index ceb7024b97b..f1edd89a042 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -244,6 +245,11 @@ public synchronized void commitSync() {
         commitSync(this.subscriptions.allConsumed());
     }
 
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, 
final Duration duration) {
+        commitSync(offsets);
+    }
+
     @Override
     public synchronized void seek(TopicPartition partition, long offset) {
         ensureNotClosed();
@@ -259,6 +265,11 @@ public synchronized OffsetAndMetadata 
committed(TopicPartition partition) {
         return new OffsetAndMetadata(0);
     }
 
+    @Override
+    public OffsetAndMetadata committed(TopicPartition partition, final 
Duration duration) {
+        return committed(partition);
+    }
+
     @Override
     public synchronized long position(TopicPartition partition) {
         ensureNotClosed();
@@ -272,6 +283,11 @@ public synchronized long position(TopicPartition 
partition) {
         return offset;
     }
 
+    @Override
+    public synchronized long position(TopicPartition partition, final Duration 
duration) {
+        return position(partition);
+    }
+
     @Override
     public synchronized void seekToBeginning(Collection<TopicPartition> 
partitions) {
         ensureNotClosed();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index dd4bb7038f3..a1c1f694ee7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -211,7 +211,7 @@ public synchronized void ensureCoordinatorReady() {
      * @param timeoutMs Maximum time to wait to discover the coordinator
      * @return true If coordinator discovery and initial connection succeeded, 
false otherwise
      */
-    protected synchronized boolean ensureCoordinatorReady(long startTimeMs, 
long timeoutMs) {
+    public synchronized boolean ensureCoordinatorReady(long startTimeMs, long 
timeoutMs) {
         long remainingMs = timeoutMs;
 
         while (coordinatorUnknown()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3c99c966d54..215775dc7b0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -31,6 +31,7 @@
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Measurable;
@@ -452,6 +453,27 @@ public void refreshCommittedOffsetsIfNeeded() {
         }
     }
 
+    /**
+     * Refresh the committed offsets for provided partitions.
+     * 
+     * @param startMs   The time in which the operation starts
+     * @param timeout   The maximum allowable duration of the method
+     * @throws TimeoutException if committed offsets cannot be retrieved 
within set amount of time
+     */
+    public void refreshCommittedOffsetsIfNeeded(long startMs, long timeout) {
+        Set<TopicPartition> missingFetchPositions = 
subscriptions.missingFetchPositions();
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
fetchCommittedOffsets(missingFetchPositions, startMs, timeout, time);
+        if (offsets == null) {
+            throw new TimeoutException("Offsets cannot be retrieved within set 
duration");
+        }
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsets.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            long offset = entry.getValue().offset();
+            log.debug("Setting offset for partition {} to the committed offset 
{}", tp, offset);
+            this.subscriptions.seek(tp, offset);
+        }
+    }
+
     /**
      * Fetch the current committed offsets from the coordinator for a set of 
partitions.
      * @param partitions The partitions to fetch offsets for
@@ -477,6 +499,39 @@ public void refreshCommittedOffsetsIfNeeded() {
             time.sleep(retryBackoffMs);
         }
     }
+    
+    /**
+     * Fetch the current committed offsets from the coordinator for a set of 
partitions 
+     * within a given set of time.
+     * @param partitions The partitions to fetch offsets for
+     * @param startMs    The start time of the operation
+     * @param timeoutMs    The maximum duration of the method
+     * @param time       Java utility which keeps track of time in form of 
long (milliseconds)
+     * @throws TimeoutException if offsets cannot be retrieved within set 
amount of time
+     * @return A map from partition to the committed offset
+     */
+    public Map<TopicPartition, OffsetAndMetadata> 
fetchCommittedOffsets(Set<TopicPartition> partitions, 
+                                                                        long 
startMs, long timeoutMs, Time time) {
+        if (partitions.isEmpty())
+            return Collections.emptyMap();
+        while (time.milliseconds() < startMs + timeoutMs) {
+            ensureCoordinatorReady(startMs, timeoutMs);
+            if (time.milliseconds() > startMs + timeoutMs) throw new 
TimeoutException("Error, coordinator is not ready in allocated time!");
+
+            // contact coordinator to fetch committed offsets
+            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
sendOffsetFetchRequest(partitions);
+            client.poll(timeoutMs, time.milliseconds(), future);
+
+            if (future.succeeded())
+                return future.value();
+
+            if (future.isDone())
+                throw future.exception();
+
+            time.sleep(retryBackoffMs);
+        }
+        return null;
+    }
 
     public void close(long timeoutMs) {
         // we do not need to re-enable wakeups since we are closing already
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8c147a58f77..0a9874cd651 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -78,6 +78,7 @@
 import org.junit.rules.ExpectedException;
 
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -526,8 +527,7 @@ public void testResetToCommittedOffset() {
 
         
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L), 
Errors.NONE), coordinator);
         consumer.poll(0);
-
-        assertEquals(539L, consumer.position(tp0));
+        assertEquals(539L, consumer.position(tp0, Duration.ofSeconds(2)));
     }
 
     @Test
@@ -1039,7 +1039,7 @@ public void 
testManualAssignmentChangeWithAutoCommitEnabled() {
 
         ConsumerRecords<String, String> records = consumer.poll(5);
         assertEquals(1, records.count());
-        assertEquals(11L, consumer.position(tp0));
+        assertEquals(11L, consumer.position(tp0, Duration.ofSeconds(2)));
 
         // mock the offset commit response for to be revoked partitions
         AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, 
coordinator, tp0, 11);
@@ -1160,8 +1160,8 @@ public void testOffsetOfPausedPartitions() {
         offsetResponse.put(tp0, 3L);
         offsetResponse.put(tp1, 3L);
         client.prepareResponse(listOffsetsResponse(offsetResponse));
-        assertEquals(3L, consumer.position(tp0));
-        assertEquals(3L, consumer.position(tp1));
+        assertEquals(3L, consumer.position(tp0, Duration.ofSeconds(2)));
+        assertEquals(3L, consumer.position(tp1, Duration.ofSeconds(2)));
 
         client.requests().clear();
         consumer.unsubscribe();
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ab7ca64f11c..52127601e95 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -14,6 +14,7 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import java.util
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.ExecutionException
 import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
@@ -911,20 +912,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset)
   }
 
-  @Test
+  @Test(expected = classOf[org.apache.kafka.common.errors.TimeoutException])
   def testOffsetFetchTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), groupResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    this.consumers.head.position(tp, 2000L, TimeUnit.MILLISECONDS)
   }
 
-  @Test
+  @Test(expected = classOf[org.apache.kafka.common.errors.TimeoutException])
   def testOffsetFetchWithTopicAndGroupRead() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    this.consumers.head.position(tp, 2000L, TimeUnit.MILLISECONDS)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 53b3ed679bb..cfc03696440 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -151,7 +151,7 @@ class ConsumerBounceTest extends IntegrationTestHarness 
with Logging {
       if (coin == 0) {
         info("Seeking to end of log")
         consumer.seekToEnd(Collections.emptyList())
-        assertEquals(numRecords.toLong, consumer.position(tp))
+        assertEquals(numRecords.toLong, consumer.position(tp, 2000L, 
TimeUnit.MILLISECONDS))
       } else if (coin == 1) {
         val pos = TestUtils.random.nextInt(numRecords).toLong
         info("Seeking to " + pos)
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index a06e9e36528..5cd210d9b82 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -15,6 +15,7 @@ package kafka.api
 import java.util
 import java.util.regex.Pattern
 import java.util.{Collections, Locale, Properties}
+import java.util.concurrent.TimeUnit
 
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
@@ -583,15 +584,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer.assign(List(tp).asJava)
 
     consumer.seekToEnd(List(tp).asJava)
-    assertEquals(totalRecords, consumer.position(tp))
+    assertEquals(totalRecords, consumer.position(tp, 2000L, 
TimeUnit.MILLISECONDS))
     assertFalse(consumer.poll(totalRecords).iterator().hasNext)
 
     consumer.seekToBeginning(List(tp).asJava)
-    assertEquals(0, consumer.position(tp), 0)
+    assertEquals(0, consumer.position(tp, 2000L, TimeUnit.MILLISECONDS), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
 
     consumer.seek(tp, mid)
-    assertEquals(mid, consumer.position(tp))
+    assertEquals(mid, consumer.position(tp, 2000L, TimeUnit.MILLISECONDS))
 
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 
mid.toInt, startingKeyAndValueIndex = mid.toInt,
       startingTimestamp = mid.toLong)
@@ -601,15 +602,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer.assign(List(tp2).asJava)
 
     consumer.seekToEnd(List(tp2).asJava)
-    assertEquals(totalRecords, consumer.position(tp2))
+    assertEquals(totalRecords, consumer.position(tp2, 2000L, 
TimeUnit.MILLISECONDS))
     assertFalse(consumer.poll(totalRecords).iterator().hasNext)
 
     consumer.seekToBeginning(List(tp2).asJava)
-    assertEquals(0, consumer.position(tp2), 0)
+    assertEquals(0, consumer.position(tp2, 2000L, TimeUnit.MILLISECONDS), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = 
tp2)
 
     consumer.seek(tp2, mid)
-    assertEquals(mid, consumer.position(tp2))
+    assertEquals(mid, consumer.position(tp2, 2000L, TimeUnit.MILLISECONDS))
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 
mid.toInt, startingKeyAndValueIndex = mid.toInt,
       startingTimestamp = mid.toLong, tp = tp2)
   }
@@ -626,7 +627,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producer.close()
   }
 
-  @Test
+  @Test(expected = classOf[org.apache.kafka.common.errors.TimeoutException])
   def testPositionAndCommit() {
     sendRecords(5)
 
@@ -639,12 +640,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     this.consumers.head.assign(List(tp).asJava)
 
-    assertEquals("position() on a partition that we are subscribed to should 
reset the offset", 0L, this.consumers.head.position(tp))
+    assertEquals("position() on a partition that we are subscribed to should 
reset the offset", 0L, this.consumers.head.position(tp, 2000L, 
TimeUnit.MILLISECONDS))
     this.consumers.head.commitSync()
     assertEquals(0L, this.consumers.head.committed(tp).offset)
 
     consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, 
startingOffset = 0)
-    assertEquals("After consuming 5 records, position should be 5", 5L, 
this.consumers.head.position(tp))
+    assertEquals("After consuming 5 records, position should be 5", 5L, 
this.consumers.head.position(tp, 2000L, TimeUnit.MILLISECONDS))
     this.consumers.head.commitSync()
     assertEquals("Committed offset should be returned", 5L, 
this.consumers.head.committed(tp).offset)
 
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 8435e5a3a6c..6e3fad3b7f8 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -1,4 +1,5 @@
 /**
+
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
@@ -168,7 +169,7 @@ class TransactionsTest extends KafkaServerTestHarness {
     assertEquals(2, readCommittedConsumer.assignment.size)
     readCommittedConsumer.seekToEnd(readCommittedConsumer.assignment)
     readCommittedConsumer.assignment.asScala.foreach { tp =>
-      assertEquals(1L, readCommittedConsumer.position(tp))
+      assertEquals(1L, readCommittedConsumer.position(tp, 2, TimeUnit.SECONDS))
     }
 
     // undecided timestamps should not be searchable either
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 186276c22d8..e8620ac1088 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -125,6 +125,13 @@
 
     private static final String JMX_PREFIX = "kafka.streams";
     private static final int DEFAULT_CLOSE_TIMEOUT = 0;
+    /**
+     * @TODO This only exists to allow us to pass tests in Kafka Streams
+     * We will need to fix this in a future pull request, particularly 
+     * since how much time one would need to block for Kafka Streams is 
+     * still unknown
+     */
+    private static final int DEFAULT_BLOCKING_TIME = 20000;
 
     // processId is expected to be unique across JVMs and to be used
     // in userData of the subscription request to allow assignor be aware
@@ -526,7 +533,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
      */
     public KafkaStreams(final Topology topology,
                         final Properties props) {
-        this(topology.internalTopologyBuilder, new StreamsConfig(props), new 
DefaultKafkaClientSupplier());
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), new 
DefaultKafkaClientSupplier(), DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -544,7 +551,7 @@ public KafkaStreams(final Topology topology,
     public KafkaStreams(final Topology topology,
                         final Properties props,
                         final KafkaClientSupplier clientSupplier) {
-        this(topology.internalTopologyBuilder, new StreamsConfig(props), 
clientSupplier, Time.SYSTEM);
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), 
clientSupplier, Time.SYSTEM, 20000L);
     }
 
     /**
@@ -559,9 +566,10 @@ public KafkaStreams(final Topology topology,
      * @throws StreamsException if any fatal error occurs
      */
     public KafkaStreams(final Topology topology,
+                        final StreamsConfig config,
                         final Properties props,
                         final Time time) {
-        this(topology.internalTopologyBuilder, new StreamsConfig(props), new 
DefaultKafkaClientSupplier(), time);
+        this(topology.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier(), DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -581,7 +589,7 @@ public KafkaStreams(final Topology topology,
                         final Properties props,
                         final KafkaClientSupplier clientSupplier,
                         final Time time) {
-        this(topology.internalTopologyBuilder, new StreamsConfig(props), 
clientSupplier, time);
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), 
clientSupplier, time, DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -590,7 +598,7 @@ public KafkaStreams(final Topology topology,
     @Deprecated
     public KafkaStreams(final 
org.apache.kafka.streams.processor.TopologyBuilder builder,
                         final Properties props) {
-        this(builder.internalTopologyBuilder, new StreamsConfig(props), new 
DefaultKafkaClientSupplier());
+        this(builder.internalTopologyBuilder, new StreamsConfig(props), new 
DefaultKafkaClientSupplier(), DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -599,7 +607,7 @@ public KafkaStreams(final 
org.apache.kafka.streams.processor.TopologyBuilder bui
     @Deprecated
     public KafkaStreams(final 
org.apache.kafka.streams.processor.TopologyBuilder builder,
                         final StreamsConfig config) {
-        this(builder.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier());
+        this(builder.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier(), DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -609,7 +617,7 @@ public KafkaStreams(final 
org.apache.kafka.streams.processor.TopologyBuilder bui
     public KafkaStreams(final 
org.apache.kafka.streams.processor.TopologyBuilder builder,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier) {
-        this(builder.internalTopologyBuilder, config, clientSupplier);
+        this(builder.internalTopologyBuilder, config, clientSupplier, 
DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -618,7 +626,7 @@ public KafkaStreams(final 
org.apache.kafka.streams.processor.TopologyBuilder bui
     @Deprecated
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config) {
-        this(topology.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier());
+        this(topology.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier(), DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -628,7 +636,7 @@ public KafkaStreams(final Topology topology,
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier) {
-        this(topology.internalTopologyBuilder, config, clientSupplier);
+        this(topology.internalTopologyBuilder, config, clientSupplier, 
DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -637,20 +645,22 @@ public KafkaStreams(final Topology topology,
     @Deprecated
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
-                        final Time time) {
-        this(topology.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier(), time);
+                        final long maxCommitMs) {
+        this(topology.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier(), maxCommitMs);
     }
 
     private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                          final StreamsConfig config,
-                         final KafkaClientSupplier clientSupplier) throws 
StreamsException {
-        this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
+                         final KafkaClientSupplier clientSupplier,
+                         final long maxCommitMs) throws StreamsException {
+        this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM, 
maxCommitMs);
     }
 
     private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                          final StreamsConfig config,
                          final KafkaClientSupplier clientSupplier,
-                         final Time time) throws StreamsException {
+                         final Time time,
+                         final long maxCommitMs) throws StreamsException {
         this.config = config;
         this.time = time;
 
@@ -731,6 +741,7 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
                                              clientId,
                                              metrics,
                                              time,
+                                             DEFAULT_BLOCKING_TIME,
                                              streamsMetadataState,
                                              cacheSizePerThread,
                                              stateDirectory,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index e8ec5e9fe5f..fc73ba39512 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -45,12 +45,21 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This class is responsible for the initialization, restoration, closing, 
flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per 
Application Instance.
  */
 public class GlobalStateManagerImpl extends AbstractStateManager implements 
GlobalStateManager {
+    /**
+     * @TODO Currently, this is a temporary marker from which 
+     * we could pass tests. GlobalStateManageImpl will have to 
+     * be updated so that the amount of time for which we wait
+     * could be controlled through user input or a set configuration.
+     */
+    private static final long DEFAULT_WAIT_TIME = 20000L;
+
     private final Logger log;
     private final ProcessorTopology topology;
     private final Consumer<byte[], byte[]> globalConsumer;
@@ -248,7 +257,7 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
             }
 
-            long offset = globalConsumer.position(topicPartition);
+            long offset = globalConsumer.position(topicPartition, 
DEFAULT_WAIT_TIME, TimeUnit.MILLISECONDS);
             final Long highWatermark = highWatermarks.get(topicPartition);
             BatchingStateRestoreCallback
                 stateRestoreAdapter =
@@ -268,7 +277,7 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                         if (record.key() != null) {
                             restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
                         }
-                        offset = globalConsumer.position(topicPartition);
+                        offset = globalConsumer.position(topicPartition, 
DEFAULT_WAIT_TIME, TimeUnit.MILLISECONDS);
                     }
                     stateRestoreAdapter.restoreAll(restoreRecords);
                     stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5fcba76570e..5644957b478 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -39,9 +39,15 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 public class StoreChangelogReader implements ChangelogReader {
 
+    /**
+     * @TODO Currently, for position() in StoreChangelogReader,
+     * position is called with 20000L as blocking time. This is 
+     * not acceptable to most users, and needs to be changed.
+     */
     private final Logger log;
     private final Consumer<byte[], byte[]> restoreConsumer;
     private final StateRestoreListener userStateRestoreListener;
@@ -50,13 +56,16 @@
     private final Map<TopicPartition, StateRestorer> stateRestorers = new 
HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsRestoring = new 
HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new 
HashMap<>();
+    private final long maxBlockMs;
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
                                 final StateRestoreListener 
userStateRestoreListener,
-                                final LogContext logContext) {
+                                final LogContext logContext,
+                                final long maxBlockMs) {
         this.restoreConsumer = restoreConsumer;
         this.log = logContext.logger(getClass());
         this.userStateRestoreListener = userStateRestoreListener;
+        this.maxBlockMs = maxBlockMs;
     }
 
     @Override
@@ -175,7 +184,7 @@ private void startRestoration(final Map<TopicPartition, 
StateRestorer> initializ
                 logRestoreOffsets(restorer.partition(),
                                   restorer.checkpoint(),
                                   endOffsets.get(restorer.partition()));
-                
restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
+                
restorer.setStartingOffset(restoreConsumer.position(restorer.partition(), 
maxBlockMs, TimeUnit.MILLISECONDS));
                 restorer.restoreStarted();
             } else {
                 
restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
@@ -184,7 +193,7 @@ private void startRestoration(final Map<TopicPartition, 
StateRestorer> initializ
         }
 
         for (final StateRestorer restorer : needsPositionUpdate) {
-            final long position = 
restoreConsumer.position(restorer.partition());
+            final long position = 
restoreConsumer.position(restorer.partition(), maxBlockMs, 
TimeUnit.MILLISECONDS);
             logRestoreOffsets(restorer.partition(),
                               position,
                               endOffsets.get(restorer.partition()));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ab96cce4c2f..b8a72f42abd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -576,6 +576,12 @@ void removeAllSensors() {
     final Consumer<byte[], byte[]> consumer;
     final InternalTopologyBuilder builder;
 
+    /**
+     * @TODO Currently, the parameter commitTime is used as a mechanism by 
+     * which the user can input a set time to StoreChangeLogReader to block
+     * for position(). We might need to change the way by which timeout is 
passed
+     * to StoreChangeLogReader.
+     */
     public static StreamThread create(final InternalTopologyBuilder builder,
                                       final StreamsConfig config,
                                       final KafkaClientSupplier clientSupplier,
@@ -584,6 +590,7 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
                                       final String clientId,
                                       final Metrics metrics,
                                       final Time time,
+                                      final long commitTime,
                                       final StreamsMetadataState 
streamsMetadataState,
                                       final long cacheSizeBytes,
                                       final StateDirectory stateDirectory,
@@ -597,7 +604,7 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
         log.info("Creating restore consumer client");
         final Map<String, Object> restoreConsumerConfigs = 
config.getRestoreConsumerConfigs(threadClientId);
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
-        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);
+        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext, 
commitTime);
 
         Producer<byte[], byte[]> threadProducer = null;
         final boolean eosEnabled = 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 9dfb6dda37e..22d3d6d27c2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -166,7 +166,7 @@ public void setup() {
                .groupBy(MockMapper.selectKeyKeyValueMapper())
                .count();
 
-        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration, 
time);
+        kafkaStreams = new KafkaStreams(builder.build(), new 
StreamsConfig(streamsConfiguration), 20000);
     }
 
     @After
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 19ddedfdc94..3645a85a681 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -314,7 +314,7 @@ private void createStateForRestoration()
 
         final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition partition : partitions) {
-            final long position = consumer.position(partition);
+            final long position = consumer.position(partition, 2000L, 
TimeUnit.MILLISECONDS);
             offsets.put(partition, new OffsetAndMetadata(position + 1));
         }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 347e9c4fd75..1ff35e07a22 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -233,7 +233,7 @@ private AbstractTask createTask(final Consumer consumer,
                                 storeTopicPartitions,
                                 ProcessorTopology.withLocalStores(new 
ArrayList<>(stateStoresToChangelogTopics.keySet()), 
storeNamesToChangelogTopics),
                                 consumer,
-                                new StoreChangelogReader(consumer, new 
MockStateRestoreListener(), new LogContext("stream-task-test ")),
+                                new StoreChangelogReader(consumer, new 
MockStateRestoreListener(), new LogContext("stream-task-test "), 20000L),
                                 false,
                                 stateDirectory,
                                 config) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 605ab337983..77791f0f098 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -123,7 +123,7 @@ private StreamsConfig createConfig(final File baseDir) 
throws IOException {
 
     private final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockRestoreConsumer restoreStateConsumer = new 
MockRestoreConsumer();
-    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new 
LogContext("standby-task-test "));
+    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new 
LogContext("standby-task-test "), 20000L);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index c65d4efadb1..4bc24ebdeb8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -72,7 +72,7 @@
     private final StateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
     private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
     private final LogContext logContext = new LogContext("test-reader ");
-    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, stateRestoreListener, logContext);
+    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, stateRestoreListener, logContext, 20000L);
 
     @Before
     public void setUp() {
@@ -90,7 +90,7 @@ public void shouldRequestTopicsAndHandleTimeoutException() {
             }
         };
 
-        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, stateRestoreListener, logContext);
+        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, stateRestoreListener, logContext, 20000);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
         changelogReader.restore(active);
         assertTrue(functionCalled.get());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a30582905de..55ad8971f04 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -119,7 +119,7 @@ public void close() {
     private final MockProducer<byte[], byte[]> producer = new 
MockProducer<>(false, bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new 
LogContext("stream-task-test ")) {
+    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new 
LogContext("stream-task-test "), 20000L) {
         @Override
         public Map<TopicPartition, Long> restoredOffsets() {
             return Collections.singletonMap(changelogPartition, offset);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index b22d98ee41c..34f04596b57 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -224,6 +224,7 @@ private StreamThread createStreamThread(final String 
clientId, final StreamsConf
                                    clientId,
                                    metrics,
                                    mockTime,
+                                   5000,
                                    streamsMetadataState,
                                    0,
                                    stateDirectory,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 85c282ca461..85be612fd08 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -184,7 +184,7 @@ private StreamTask createStreamsTask(final String 
applicationId,
             Collections.singletonList(new TopicPartition(topicName, 
taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, new 
MockStateRestoreListener(), new LogContext("test-stream-task ")),
+            new StoreChangelogReader(clientSupplier.restoreConsumer, new 
MockStateRestoreListener(), new LogContext("test-stream-task "), 20000L),
             streamsConfig,
             new MockStreamsMetrics(new Metrics()),
             stateDirectory,
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 82c39eeb1d3..a9e75c2dd7e 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -242,7 +242,8 @@ public ProcessorTopologyTestDriver(final StreamsConfig 
config,
                                   new StoreChangelogReader(
                                       
createRestoreConsumer(topology.storeToChangelogTopic()),
                                       new MockStateRestoreListener(),
-                                      new LogContext("topology-test-driver ")),
+                                      new LogContext("topology-test-driver "),
+                                      20000L),
                                   config,
                                   streamsMetrics, stateDirectory,
                                   cache,
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index abcc99d362f..a025e9062e0 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -294,7 +294,8 @@ public void onRestoreEnd(TopicPartition topicPartition, 
String storeName, long t
                 new StoreChangelogReader(
                     
createRestoreConsumer(processorTopology.storeToChangelogTopic()),
                     stateRestoreListener,
-                    new LogContext("topology-test-driver ")),
+                    new LogContext("topology-test-driver "),
+                    20000L),
                 streamsConfig,
                 streamsMetrics,
                 stateDirectory,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add TimeoutException to KafkaConsumer#position()
> ------------------------------------------------
>
>                 Key: KAFKA-6608
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6608
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: Richard Yu
>            Assignee: Richard Yu
>            Priority: Blocker
>              Labels: kip
>
> In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's {{timeout}} 
> being set to {{Long.MAX_VALUE}}. While fixing this issue, it was pointed out 
> that if a timeout was added to methods which commits offsets synchronously, a 
> stricter control on time could be achieved.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to