Repository: kafka Updated Branches: refs/heads/trunk 125d8d6f7 -> e554dc518
KAFKA-5915; Support unmapping of mapped/direct buffers in Java 9 As mentioned in MappedByteBuffers' class documentation, its implementation was inspired by Lucene's MMapDirectory: https://github.com/apache/lucene-solr/blob/releases/lucene-solr/6.6.1/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java#L315 Without this change, unmapping fails with the following message: > java.lang.IllegalAccessError: class kafka.log.AbstractIndex (in unnamed > module 0x45103d6b) cannot access class jdk.internal.ref.Cleaner (in module > java.base) because module java.base does not export jdk.internal.ref to > unnamed module 0x45103d6b Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> Closes #3879 from ijuma/kafka-5915-unmap-mapped-buffers-java-9 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e554dc51 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e554dc51 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e554dc51 Branch: refs/heads/trunk Commit: e554dc518eaaa0747899e708160275f95c4e525f Parents: 125d8d6 Author: Ismael Juma <ism...@juma.me.uk> Authored: Fri Sep 22 19:32:09 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Fri Sep 22 19:32:09 2017 +0100 ---------------------------------------------------------------------- .../kafka/common/utils/MappedByteBuffers.java | 136 +++++++++++++++++++ .../common/utils/MappedByteBuffersTest.java | 41 ++++++ .../main/scala/kafka/log/AbstractIndex.scala | 39 ++---- .../scala/unit/kafka/log/OffsetIndexTest.scala | 23 +++- 4 files changed, 209 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java b/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java new file mode 100644 index 0000000..1faecb3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; + +import static java.lang.invoke.MethodHandles.constant; +import static java.lang.invoke.MethodHandles.dropArguments; +import static java.lang.invoke.MethodHandles.filterReturnValue; +import static java.lang.invoke.MethodHandles.guardWithTest; +import static java.lang.invoke.MethodHandles.lookup; +import static java.lang.invoke.MethodType.methodType; + +/** + * Utility methods for MappedByteBuffer implementations. + * + * The unmap implementation was inspired by the one in Lucene's MMapDirectory. + */ +public final class MappedByteBuffers { + + private static final Logger log = LoggerFactory.getLogger(MappedByteBuffers.class); + + // null if unmap is not supported + private static final MethodHandle UNMAP; + + // null if unmap is supported + private static final RuntimeException UNMAP_NOT_SUPPORTED_EXCEPTION; + + static { + Object unmap = null; + RuntimeException exception = null; + try { + unmap = lookupUnmapMethodHandle(); + } catch (RuntimeException e) { + exception = e; + } + if (unmap != null) { + UNMAP = (MethodHandle) unmap; + UNMAP_NOT_SUPPORTED_EXCEPTION = null; + } else { + UNMAP = null; + UNMAP_NOT_SUPPORTED_EXCEPTION = exception; + } + } + + private MappedByteBuffers() {} + + public static void unmap(String resourceDescription, MappedByteBuffer buffer) throws IOException { + if (!buffer.isDirect()) + throw new IllegalArgumentException("Unmapping only works with direct buffers"); + if (UNMAP == null) + throw UNMAP_NOT_SUPPORTED_EXCEPTION; + + try { + UNMAP.invokeExact((ByteBuffer) buffer); + } catch (Throwable throwable) { + throw new IOException("Unable to unmap the mapped buffer: " + resourceDescription, throwable); + } + } + + private static MethodHandle lookupUnmapMethodHandle() { + final MethodHandles.Lookup lookup = lookup(); + try { + if (Java.IS_JAVA9_COMPATIBLE) + return unmapJava9(lookup); + else + return unmapJava7Or8(lookup); + } catch (ReflectiveOperationException | RuntimeException e1) { + throw new UnsupportedOperationException("Unmapping is not supported on this platform, because internal " + + "Java APIs are not compatible with this Kafka version", e1); + } + } + + private static MethodHandle unmapJava7Or8(MethodHandles.Lookup lookup) throws ReflectiveOperationException { + /* "Compile" a MethodHandle that is roughly equivalent to the following lambda: + * + * (ByteBuffer buffer) -> { + * sun.misc.Cleaner cleaner = ((java.nio.DirectByteBuffer) byteBuffer).cleaner(); + * if (nonNull(cleaner)) + * cleaner.clean(); + * else + * noop(cleaner); // the noop is needed because MethodHandles#guardWithTest always needs both if and else + * } + */ + Class<?> directBufferClass = Class.forName("java.nio.DirectByteBuffer"); + Method m = directBufferClass.getMethod("cleaner"); + m.setAccessible(true); + MethodHandle directBufferCleanerMethod = lookup.unreflect(m); + Class<?> cleanerClass = directBufferCleanerMethod.type().returnType(); + MethodHandle cleanMethod = lookup.findVirtual(cleanerClass, "clean", methodType(void.class)); + MethodHandle nonNullTest = lookup.findStatic(MappedByteBuffers.class, "nonNull", + methodType(boolean.class, Object.class)).asType(methodType(boolean.class, cleanerClass)); + MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass); + MethodHandle unmapper = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop)) + .asType(methodType(void.class, ByteBuffer.class)); + return unmapper; + } + + private static MethodHandle unmapJava9(MethodHandles.Lookup lookup) throws ReflectiveOperationException { + Class<?> unsafeClass = Class.forName("sun.misc.Unsafe"); + MethodHandle unmapper = lookup.findVirtual(unsafeClass, "invokeCleaner", + methodType(void.class, ByteBuffer.class)); + Field f = unsafeClass.getDeclaredField("theUnsafe"); + f.setAccessible(true); + Object theUnsafe = f.get(null); + return unmapper.bindTo(theUnsafe); + } + + private static boolean nonNull(Object o) { + return o != null; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java b/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java new file mode 100644 index 0000000..38fe9dd --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/MappedByteBuffersTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.apache.kafka.test.TestUtils; +import org.junit.Test; + +import java.io.File; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +public class MappedByteBuffersTest { + + /** + * Checks that unmap doesn't throw exceptions. + */ + @Test + public void testUnmap() throws Exception { + File file = TestUtils.tempFile(); + try (FileChannel channel = FileChannel.open(file.toPath())) { + MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_ONLY, 0, 0); + MappedByteBuffers.unmap(file.getAbsolutePath(), map); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/core/src/main/scala/kafka/log/AbstractIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index 2d7cc7e..40ec870 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -25,8 +25,7 @@ import java.util.concurrent.locks.{Lock, ReentrantLock} import kafka.log.IndexSearchType.IndexSearchEntity import kafka.utils.CoreUtils.inLock import kafka.utils.{CoreUtils, Logging} -import org.apache.kafka.common.utils.{OperatingSystem, Utils} -import sun.nio.ch.DirectBuffer +import org.apache.kafka.common.utils.{MappedByteBuffers, OperatingSystem, Utils} import scala.math.ceil @@ -109,7 +108,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon /* Windows won't let us modify the file length while the file is mmapped :-( */ if (OperatingSystem.IS_WINDOWS) - forceUnmap(mmap) + safeForceUnmap() try { raf.setLength(roundedNewSize) mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) @@ -150,10 +149,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. - CoreUtils.swallow(forceUnmap(mmap)) - // Accessing unmapped mmap crashes JVM by SEGV. - // Accessing it after this method called sounds like a bug but for safety, assign null and do not allow later access. - mmap = null + safeForceUnmap() } file.delete() } @@ -178,11 +174,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon trimToValidSize() } - def closeHandler() = { - // File handler of the index field will be closed after the mmap is garbage collected - CoreUtils.swallow(forceUnmap(mmap)) - mmap = null - } + def closeHandler(): Unit = safeForceUnmap() /** * Do a basic sanity check on this index to detect obvious problems @@ -202,22 +194,19 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon */ def truncateTo(offset: Long): Unit + protected def safeForceUnmap(): Unit = { + try forceUnmap() + catch { + case t: Throwable => error(s"Error unmapping index $file", t) + } + } + /** * Forcefully free the buffer's mmap. */ - protected def forceUnmap(m: MappedByteBuffer) { - try { - m match { - case buffer: DirectBuffer => - val bufferCleaner = buffer.cleaner() - /* cleaner can be null if the mapped region has size 0 */ - if (bufferCleaner != null) - bufferCleaner.clean() - case _ => - } - } catch { - case t: Throwable => error("Error when freeing index buffer", t) - } + protected[log] def forceUnmap() { + try MappedByteBuffers.unmap(file.getAbsolutePath, mmap) + finally mmap = null // Accessing unmapped mmap crashes JVM by SEGV so we null it out to be safe } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/e554dc51/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index 506d99c..8fa3cc1 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -18,10 +18,14 @@ package kafka.log import java.io._ +import java.nio.file.Files + import org.junit.Assert._ -import java.util.{Collections, Arrays} +import java.util.{Arrays, Collections} + import org.junit._ import org.scalatest.junit.JUnitSuite + import scala.collection._ import scala.util.Random import kafka.utils.TestUtils @@ -34,7 +38,7 @@ class OffsetIndexTest extends JUnitSuite { @Before def setup() { - this.idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8) + this.idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8) } @After @@ -135,7 +139,7 @@ class OffsetIndexTest extends JUnitSuite { @Test def truncate() { - val idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8) + val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8) idx.truncate() for(i <- 1 until 10) idx.append(i, i) @@ -165,6 +169,14 @@ class OffsetIndexTest extends JUnitSuite { assertEquals("Full truncation should leave no entries", 0, idx.entries) idx.append(0, 0) } + + @Test + def forceUnmapTest(): Unit = { + val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8) + idx.forceUnmap() + // mmap should be null after unmap causing lookup to throw a NPE + intercept[NullPointerException](idx.lookup(1)) + } def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, klass: Class[T]) { try { @@ -186,9 +198,10 @@ class OffsetIndexTest extends JUnitSuite { vals } - def nonExistantTempFile(): File = { + def nonExistentTempFile(): File = { val file = TestUtils.tempFile() - file.delete() + Files.delete(file.toPath) file } + }