[GitHub] [kafka] gwenshap commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure
gwenshap commented on pull request #8844: URL: https://github.com/apache/kafka/pull/8844#issuecomment-643911650 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories
[ https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17135400#comment-17135400 ] Ming Liu commented on KAFKA-8362: - In 2.5 Kafka, this still easily repro for me. With compacted topic, let's say topic Test1 partition 100 is disk 1, in "cleaner-offset-checkpoint", you can see Test1 100 2344432 Then if you move that partition to disk 2, you will see the same thing in "cleaner-offset-checkpoint", but the disk 1 "cleaner-offset-checkpoint" still contains that partition. Test1 100 2344543 This actually caused the problem during logclean due to how allCleanerCheckpoints is implemented. > LogCleaner gets stuck after partition move between log directories > -- > > Key: KAFKA-8362 > URL: https://issues.apache.org/jira/browse/KAFKA-8362 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Julio Ng >Priority: Major > > When a partition is moved from one directory to another, their checkpoint > entry in cleaner-offset-checkpoint file is not removed from the source > directory. > As a consequence when we read the last firstDirtyOffset, we might get a stale > value from the old checkpoint file. > Basically, we need clean up the entry from the check point file in the source > directory when the move is completed > The current issue is that the code in LogCleanerManager: > {noformat} > /** > * @return the position processed for all logs. > */ > def allCleanerCheckpoints: Map[TopicPartition, Long] = { > inLock(lock) { > checkpoints.values.flatMap(checkpoint => { > try { > checkpoint.read() > } catch { > case e: KafkaStorageException => > error(s"Failed to access checkpoint file ${checkpoint.file.getName} > in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e) > Map.empty[TopicPartition, Long] > } > }).toMap > } > }{noformat} > collapses the offsets when multiple entries exist for the topicPartition -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r439923720 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition, } } - def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = { + /** + * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may Review comment: read @hachikuji https://github.com/apache/kafka/pull/8657#discussion_r427432140 again. It is a nice idea to refactor ```ReplicaManager``` and ```Partition``` to simplify the behavior of checking delayed operations. Could I address the refactor in another PR to avoid bigger patch? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-643890644 > Another way that doesn't require checking lock.isHeldByCurrentThread is the following. But your approach seems simpler. So... could we keep it simpler? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r439920019 ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ## @@ -536,6 +537,11 @@ class GroupCoordinatorTest { // Make sure the NewMemberTimeout is not still in effect, and the member is not kicked assertEquals(1, group.size) +// prepare the mock replica manager again since the delayed join is going to complete +EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() Review comment: ```GroupMetadataManager#storeGroup``` (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L245) also call ```ReplicaManager.getMagic```. There are delayed ops are completed by ```timer.advanceClock``` so we have to mock the ```replicaManager.getMagic```. the mock is same to https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala#L3823. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10159) MirrorSourceConnector don`t work on connect-distributed.sh
cosmozhu created KAFKA-10159: Summary: MirrorSourceConnector don`t work on connect-distributed.sh Key: KAFKA-10159 URL: https://issues.apache.org/jira/browse/KAFKA-10159 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.4.1 Environment: centos7 Reporter: cosmozhu Fix For: 2.4.1 Attachments: connectDistributed.out hi I want to run a MirrorSourceConnector with connect-distributed . the connector config like this : ``` { "name" : "cosmo-source", "config" : { "connector.class" : "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias" : "cosmo", "target.cluster.alias" : "nana", "source.cluster.bootstrap.servers" : "192.168.4.42:9092,192.168.4.42:9093,192.168.4.42:9094", "topics" : ".*" } } ``` when I post the rest requestion, it returns to me ``` {"name":"cosmo-source","config":{"connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","target.cluster.alias":"nana","topics":".*","source.cluster.alias":"cosmo","name":"cosmo-source","source.cluster.bootstrap.servers":"192.168.4.42:9092,192.168.4.42:9093,192.168.4.42:9094"},"tasks":[],"type":"source"} ``` the task array is empty. It's obvious that something's wrong here. in connectDistributed.out ``` org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value. ``` full logs in the attachment. thanks for any help. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r439915438 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition, } } - def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = { + /** + * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may Review comment: the caller of ```ReplicaManager#appendRecords``` may hold the group lock so it could produce deadlock if ```ReplicaManager#appendRecords``` tries to complete purgatory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r439914680 ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ## @@ -307,8 +307,14 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest override def runWithCallback(member: GroupMember, responseCallback: CompleteTxnCallback): Unit = { val producerId = 1000L val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _)) - groupCoordinator.groupManager.handleTxnCompletion(producerId, -offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean) + val isCommit = random.nextBoolean + try groupCoordinator.groupManager.handleTxnCompletion(producerId, +offsetsPartitions.map(_.partition).toSet, isCommit = isCommit) + catch { +case e: IllegalStateException if isCommit + && e.getMessage.contains("though the offset commit record itself hasn't been appended to the log")=> Review comment: TestReplicaManager#appendRecords (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala#L207) always complete the delayedProduce immediately so the txn offset is append also. This PR tries to complete the delayedProduce after releasing the group lock so it is possible to cause following execution order. 1. txn prepare 1. txn completion (fail) 1. txn append (this is executed by delayedProduce) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie commented on pull request #8831: KAFKA-8657:Client-side automatic topic creation on Producer
jiameixie commented on pull request #8831: URL: https://github.com/apache/kafka/pull/8831#issuecomment-643866848 @abbccdda Yes, is it ok to do this? If not, I can abandon this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng opened a new pull request #8871: MINOR: code cleanup for inconsistent naming
vitojeng opened a new pull request #8871: URL: https://github.com/apache/kafka/pull/8871 There are two kind of value naming of valueSerde in Streams module: valSerde / valueSerde. No function has been changed in PR, only renamed **valSerde** to **valueSerde**. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r439543737 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -967,7 +967,16 @@ class Partition(val topicPartition: TopicPartition, } } - def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = { + /** + * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may Review comment: It's cleaner to not pass in completeDelayedRequests here and let the caller (`ReplicaManager.appendRecords()`) check and complete purgatory instead. ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1118,33 +1170,38 @@ class GroupCoordinator(val brokerId: Int, group.removeStaticMember(member.groupInstanceId) group.currentState match { - case Dead | Empty => - case Stable | CompletingRebalance => maybePrepareRebalance(group, reason) - case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId)) + case Dead | Empty => None + case Stable | CompletingRebalance => +maybePrepareRebalance(group, reason) +None + case PreparingRebalance => Some(GroupKey(group.groupId)) } } - private def removePendingMemberAndUpdateGroup(group: GroupMetadata, memberId: String): Unit = { + /** + * remove the pending member and then return the group key whihc is in PreparingRebalance, + * @param group group + * @param memberId member id + * @return group key if it is in PreparingRebalance. Otherwise, None + */ + private def removePendingMemberAndUpdateGroup(group: GroupMetadata, memberId: String): Option[GroupKey] = { group.removePendingMember(memberId) -if (group.is(PreparingRebalance)) { - joinPurgatory.checkAndComplete(GroupKey(group.groupId)) -} - } - - def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = { -group.inLock { - if (group.hasAllMembersJoined) -forceComplete() - else false -} +if (group.is(PreparingRebalance)) Some(GroupKey(group.groupId)) +else None } def onExpireJoin(): Unit = { // TODO: add metrics for restabilize timeouts } - def onCompleteJoin(group: GroupMetadata): Unit = { + /** + * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been + * incremented. If the caller passes in completeDelayedRequests as false, the caller is expected to complete Review comment: The caller no longer passed in completeDelayedRequests. ## File path: core/src/main/scala/kafka/server/DelayedOperation.scala ## @@ -100,40 +99,24 @@ abstract class DelayedOperation(override val delayMs: Long, def tryComplete(): Boolean /** - * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired - * without blocking. + * Thread-safe variant of tryComplete() that attempts completion after it succeed to hold the lock. * - * If threadA acquires the lock and performs the check for completion before completion criteria is met - * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not - * yet released the lock, we need to ensure that completion is attempted again without blocking threadA - * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one - * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that - * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until - * the operation is actually completed. + * There is a long story about using "lock" or "tryLock". + * + * 1) using lock - There was a lot of cases that a thread holds a group lock and then it tries to hold more group Review comment: There was => There were ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ## @@ -3921,22 +3934,26 @@ class GroupCoordinatorTest { val (responseFuture, responseCallback) = setupCommitOffsetsCallback val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() - -EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), - EasyMock.anyShort(), + EasyMock.expect(replicaManager.completeDelayedRequests(EasyMock.anyObject())) Review comment: Hmm, this should only be called with LeaderHWChange.LeaderHWIncremented, but the mock later returns LeaderHWChange.None? Ditto below. ## File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala ## @@ -33,11 +34,40 @@ import scala.math.{max, min} */ private[group] class
[GitHub] [kafka] abbccdda commented on pull request #8831: KAFKA-8657:Client-side automatic topic creation on Producer
abbccdda commented on pull request #8831: URL: https://github.com/apache/kafka/pull/8831#issuecomment-643835672 @jiameixie Thanks, are you going to continue driving the discussion of KIP-487? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8845: KAFKA-10126:Add a warning message for ConsumerPerformance
abbccdda commented on a change in pull request #8845: URL: https://github.com/apache/kafka/pull/8845#discussion_r439877376 ## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ## @@ -256,6 +256,10 @@ object ConsumerPerformance extends LazyLogging { .defaultsTo(1) options = parser.parse(args: _*) + +if(options.has(numThreadsOpt) || options.has(numFetchersOpt)) + println("WARNING: option threads and num-fetch-threads have been deprecated and ignored") Review comment: nit: add square brackets to the options, like: ``` "WARNING: option [threads] and [num-fetch-threads] have been deprecated and will be ignored by the test" ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pzygielo commented on pull request #8211: COMDEV-340 Fix project category
pzygielo commented on pull request #8211: URL: https://github.com/apache/kafka/pull/8211#issuecomment-643813307 @ijuma - FYI - https://projects.apache.org/projects.html?category still lists this project in separate category :disappointed: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8856: KAFKA-10150: task state transitions/management and committing cleanup
guozhangwang commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r439851616 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -539,19 +537,18 @@ private void writeCheckpointIfNeed() { /** * * the following order must be followed: - * 1. checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed + * 1. commit/checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed Review comment: Seems we would never commit and checkpoint state manager any more. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -714,13 +696,20 @@ void shutdown(final boolean clean) { } } -if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { -commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); +try { +if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { +commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); +} +for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) { +final Task task = tasks.get(taskId); +task.postCommit(); +} +} catch (final RuntimeException e) { +firstException.compareAndSet(null, e); Review comment: Yeah I think if the actual `consumer.commit` call failed, then we should not trigger postCommit for any one. As for `postCommit`, I think it should never fail (we swallow the IO exception happened, because for non-EOS it is just fine, for EOS we would bootstrap from scratch). ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -215,91 +215,54 @@ public void handleAssignment(final Map> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); -final Map> activeTasksToCreate = new HashMap<>(activeTasks); -final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); -final Set tasksToRecycle = new HashSet<>(); - builder.addSubscribedTopicsFromAssignment( activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), logPrefix ); -// first rectify all existing tasks final LinkedHashMap taskCloseExceptions = new LinkedHashMap<>(); -final Set tasksToClose = new HashSet<>(); -final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); -final Set additionalTasksForCommitting = new HashSet<>(); +final Map> activeTasksToCreate = new HashMap<>(activeTasks); +final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); +final LinkedList tasksToClose = new LinkedList<>(); +final Set tasksToRecycle = new HashSet<>(); final Set dirtyTasks = new HashSet<>(); +// first rectify all existing tasks for (final Task task : tasks.values()) { if (activeTasks.containsKey(task.id()) && task.isActive()) { updateInputPartitionsAndResume(task, activeTasks.get(task.id())); -if (task.commitNeeded()) { -additionalTasksForCommitting.add(task); -} activeTasksToCreate.remove(task.id()); } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) { updateInputPartitionsAndResume(task, standbyTasks.get(task.id())); standbyTasksToCreate.remove(task.id()); -// check for tasks that were owned previously but have changed active/standby status } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) { +// check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { -try { -task.suspend(); -final Map committableOffsets = task.prepareCommit(); - -tasksToClose.add(task); -if (!committableOffsets.isEmpty()) { -consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); -} -} catch (final RuntimeException e) { -final String uncleanMessage = String.format( -"Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", -task.id()); -log.error(uncleanMessage, e); -taskCloseExceptions.put(task.id(), e); -// We've
[jira] [Commented] (KAFKA-6221) ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic creation
[ https://issues.apache.org/jira/browse/KAFKA-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17135223#comment-17135223 ] Manjunatha Gowdar commented on KAFKA-6221: -- We are facing this issue, for another scenario. We have a single topic, there is a continuous load on broker, and consumer also reading continuously. When we execute : kafka-consumer-groups.sh --bootstrap-server --describe --group , still we face this issue. Exception: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. This behavior also related to this Jira ? > ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic > creation > --- > > Key: KAFKA-6221 > URL: https://issues.apache.org/jira/browse/KAFKA-6221 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.11.0.1, 1.0.0 > Environment: RHEL 7 >Reporter: Alex Dunayevsky >Priority: Minor > Original Estimate: 336h > Remaining Estimate: 336h > > This issue appeared to happen frequently on 0.10.2.0. > On 0.10.2.1 and 1.0.0 it's a way harder to reproduce. > We'll focus on reproducing it on 0.10.2.1 and 1.0.0. > *TOPOLOGY:* > 3 brokers, 1 zk. > *REPRODUCING STRATEGY:* > Create a few dozens topics (say, 40) one by one, each with replication factor > 2. Number of partitions, generally, does not matter but, for easier > reproduction, should not be too small (around 30 or so). > *CREATE 40 TOPICS:* > {code:java} for i in {1..40}; do bin/kafka-topics.sh --create --topic > "topic${i}_p28_r2" --partitions 28 --replication-factor 2 --zookeeper :2165; > done {code} > *ERRORS* > {code:java} > *BROKER 1* > [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,27] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,27] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,9] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,9] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,3] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,3] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,15] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,15] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,21] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,21] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > *BROKER 2* > [2017-11-15 16:46:36,408] ERROR [ReplicaFetcherThread-0-3], Error for > partition [topic20_p28_r2,12] to broker > 3:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:36,408] ERROR [ReplicaFetcherThread-0-3], Error for
[jira] [Commented] (KAFKA-9781) TimestampConverter / Allow to specify a time zone when converting unix epoch to string
[ https://issues.apache.org/jira/browse/KAFKA-9781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17135169#comment-17135169 ] fml2 commented on KAFKA-9781: - I've created [https://github.com/apache/kafka/pull/8869] for this. > TimestampConverter / Allow to specify a time zone when converting unix epoch > to string > -- > > Key: KAFKA-9781 > URL: https://issues.apache.org/jira/browse/KAFKA-9781 > Project: Kafka > Issue Type: Wish > Components: KafkaConnect >Reporter: fml2 >Priority: Major > > TimestampConverter can convert a unix epoch value (long; number of > milliseconds since Jan 01 1970 00:00 GMT) to string. However, when doing such > conversion, the string result depends on the time zone used. > TimestampConverter uses UTC (i.e. GMT) for the conversion and does not allow > to change it. But I would need this in order to get the date/time > representation in my local time zone. > So I propose to introduce another config parameter (optional) for > "target.type=string": *timeZone* (use java name as the value for the > parameter). If no time zone is specified, UTC should be used, so that the > change is backwards compatible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ocadaruma opened a new pull request #8870: MINOR: Fix timestampDelta type in doc
ocadaruma opened a new pull request #8870: URL: https://github.com/apache/kafka/pull/8870 - Type of `timestampDelta` is written as `varint` in the doc, but actually it's `varlong` * https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L47 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8853: KAFKA-10147 MockAdminClient#describeConfigs(Collection
chia7712 commented on pull request #8853: URL: https://github.com/apache/kafka/pull/8853#issuecomment-643738165 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org