Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
jolshan commented on code in PR #14607: URL: https://github.com/apache/kafka/pull/14607#discussion_r1560454922 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFile.java: ## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +public class PartitionMetadataFile { +private static final String PARTITION_METADATA_FILE_NAME = "partition.metadata"; +static final int CURRENT_VERSION = 0; + +public static File newFile(File dir) { +return new File(dir, PARTITION_METADATA_FILE_NAME); +} + +private final File file; +private final LogDirFailureChannel logDirFailureChannel; + +private final Object lock = new Object(); +private volatile Optional dirtyTopicIdOpt = Optional.empty(); + +public PartitionMetadataFile( +final File file, +final LogDirFailureChannel logDirFailureChannel +) { +this.file = file; +this.logDirFailureChannel = logDirFailureChannel; +} + +/** + * Records the topic ID that will be flushed to disk. + */ +public void record(Uuid topicId) { +// Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ. +dirtyTopicIdOpt.ifPresent(dirtyTopicId -> { +if (dirtyTopicId != topicId) { Review Comment: I don't think this should be a reference comparison. Perhaps a miss in the scala -> java conversion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
junrao merged PR #14607: URL: https://github.com/apache/kafka/pull/14607 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
junrao commented on PR #14607: URL: https://github.com/apache/kafka/pull/14607#issuecomment-1789288736 Thanks for triaging the transient test failures, @alok123t. Will merge the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
alok123t commented on PR #14607: URL: https://github.com/apache/kafka/pull/14607#issuecomment-1788463950 @ijuma @junrao the latest build succeeds for JDK 8/Scala 2.12. The previous failures should be unrelated to this PR https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14607/9/pipeline/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
alok123t commented on PR #14607: URL: https://github.com/apache/kafka/pull/14607#issuecomment-1788460578 @junrao created following JIRA https://issues.apache.org/jira/browse/KAFKA-15770 For rest of the failed tests, JIRA already exists -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
ijuma commented on PR #14607: URL: https://github.com/apache/kafka/pull/14607#issuecomment-1783833262 I think this doesn't compile with Scala 2.12 - that needs to be fixed before we can proceed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
alok123t commented on PR #14607: URL: https://github.com/apache/kafka/pull/14607#issuecomment-1781801313 @junrao I think the test failures should be unrelated to the PR from a quick look - not sure if these are known flaky tests, I will wait for another run from the latest commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
alok123t commented on code in PR #14607: URL: https://github.com/apache/kafka/pull/14607#discussion_r1373738648 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; + +public class PartitionMetadata { + +private final int version; +private final Uuid topicId; + +public PartitionMetadata(int version, Uuid topicId) { +this.version = version; +this.topicId = topicId; +} + +public int version() { +return version; +} + +public Uuid topicId() { +return topicId; +} + +public String toText() { Review Comment: updated to encode in https://github.com/apache/kafka/pull/14607/commits/f2d05e64314401142d1386f031b45b42425d5a93 It's better to use the `Formatter` interface in `LeaderEpochCheckpointFile`, we can do in a follow up PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
junrao commented on code in PR #14607: URL: https://github.com/apache/kafka/pull/14607#discussion_r1373572075 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; + +public class PartitionMetadata { + +private final int version; +private final Uuid topicId; + +public PartitionMetadata(int version, Uuid topicId) { +this.version = version; +this.topicId = topicId; +} + +public int version() { +return version; +} + +public Uuid topicId() { +return topicId; +} + +public String toText() { Review Comment: Perhaps `encode` will be a better name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
alok123t commented on code in PR #14607: URL: https://github.com/apache/kafka/pull/14607#discussion_r1372811735 ## storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.Uuid; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.regex.Pattern; + +public class PartitionMetadataReadBuffer { +private static final Pattern WHITE_SPACES_PATTERN = Pattern.compile(":\\s+"); + +private final String location; +private final BufferedReader reader; + +public PartitionMetadataReadBuffer( +String location, +BufferedReader reader +) { +this.location = location; +this.reader = reader; +} + +PartitionMetadata read() throws IOException { +String line = null; +Uuid metadataTopicId; + +try { +line = reader.readLine(); +String[] versionArr = WHITE_SPACES_PATTERN.split(line); + +if (versionArr.length == 2) { +int version = Integer.parseInt(versionArr[1]); +if (version == PartitionMetadataFile.CURRENT_VERSION) { Review Comment: Makes sense, changed to `>=` instead and added a comment in https://github.com/apache/kafka/pull/14607/commits/dddade29b7a73640553aafbb6eea4a962f68ba77 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
junrao commented on code in PR #14607: URL: https://github.com/apache/kafka/pull/14607#discussion_r1372393891 ## storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.Uuid; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.regex.Pattern; + +public class PartitionMetadataReadBuffer { +private static final Pattern WHITE_SPACES_PATTERN = Pattern.compile(":\\s+"); + +private final String location; +private final BufferedReader reader; + +public PartitionMetadataReadBuffer( +String location, +BufferedReader reader +) { +this.location = location; +this.reader = reader; +} + +PartitionMetadata read() throws IOException { +String line = null; +Uuid metadataTopicId; + +try { +line = reader.readLine(); +String[] versionArr = WHITE_SPACES_PATTERN.split(line); + +if (versionArr.length == 2) { +int version = Integer.parseInt(versionArr[1]); +if (version == PartitionMetadataFile.CURRENT_VERSION) { Review Comment: This is an existing issue. In the future, we may add a new field and bump up the version, to make it possible to downgrade, it's probably better to relax this check a bit. ## storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadata.java: ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; Review Comment: It seems that this should be in the `org.apache.kafka.storage.internals.checkpoint` package? ## storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.Uuid; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.regex.Pattern; + +public class PartitionMetadataReadBuffer { +private static final Pattern WHITE_SPACES_PATTERN = Pattern.compile(":\\s+"); + +private final String location; +private final BufferedReader reader; + +public PartitionMetadataReadBuffer( +String location, +BufferedReader reader +) { +this.location = location; +this.reader = reader; +} + +PartitionMetadata read() throws IOException { +String line = null; +Uuid metadataT