Repository: kafka Updated Branches: refs/heads/trunk 23f9afb70 -> 174a43cd0
KAFKA-2474: Add caching of JSON schema conversions to JsonConverter Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #250 from ewencp/kafka-2474-cache-json-schema-conversions Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/174a43cd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/174a43cd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/174a43cd Branch: refs/heads/trunk Commit: 174a43cd09d3a2a2785daf0cfa5ada1646d8bfcc Parents: 23f9afb Author: Ewen Cheslack-Postava <[email protected]> Authored: Tue Oct 6 15:31:28 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Oct 6 15:31:28 2015 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/common/cache/Cache.java | 53 +++++++++++ .../org/apache/kafka/common/cache/LRUCache.java | 57 ++++++++++++ .../kafka/common/cache/SynchronizedCache.java | 51 +++++++++++ .../apache/kafka/common/cache/LRUCacheTest.java | 93 ++++++++++++++++++++ .../kafka/copycat/json/JsonConverter.java | 41 +++++++-- .../kafka/copycat/json/JsonConverterTest.java | 47 ++++++++++ 6 files changed, 336 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/clients/src/main/java/org/apache/kafka/common/cache/Cache.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/cache/Cache.java b/clients/src/main/java/org/apache/kafka/common/cache/Cache.java new file mode 100644 index 0000000..6678e40 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/cache/Cache.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.common.cache; + +/** + * Interface for caches, semi-peristent maps which store key-value mappings until either an eviction criteria is met + * or the entries are manually invalidated. Caches are not required to be thread-safe, but some implementations may be. + */ +public interface Cache<K, V> { + + /** + * Look up a value in the cache. + * @param key the key to + * @return the cached value, or null if it is not present. + */ + V get(K key); + + /** + * Insert an entry into the cache. + * @param key the key to insert + * @param value the value to insert + */ + void put(K key, V value); + + /** + * Manually invalidate a key, clearing its entry from the cache. + * @param key the key to remove + * @return true if the key existed in the cache and the entry was removed or false if it was not present + */ + boolean remove(K key); + + /** + * Get the number of entries in this cache. If this cache is used by multiple threads concurrently, the returned + * value will only be approximate. + * @return the number of entries in the cache + */ + long size(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java new file mode 100644 index 0000000..89e6e87 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.common.cache; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * A cache implementing a least recently used policy. + */ +public class LRUCache<K, V> implements Cache<K, V> { + private final LinkedHashMap<K, V> cache; + + public LRUCache(final int maxSize) { + cache = new LinkedHashMap<K, V>(16, .75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxSize; + } + }; + } + + @Override + public V get(K key) { + return cache.get(key); + } + + @Override + public void put(K key, V value) { + cache.put(key, value); + } + + @Override + public boolean remove(K key) { + return cache.remove(key) != null; + } + + @Override + public long size() { + return cache.size(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/clients/src/main/java/org/apache/kafka/common/cache/SynchronizedCache.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/cache/SynchronizedCache.java b/clients/src/main/java/org/apache/kafka/common/cache/SynchronizedCache.java new file mode 100644 index 0000000..0e88aa3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/cache/SynchronizedCache.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.common.cache; + +/** + * Wrapper for caches that adds simple synchronization to provide a thread-safe cache. Note that this simply adds + * synchronization around each cache method on the underlying unsynchronized cache. It does not add any support for + * atomically checking for existence of an entry and computing and inserting the value if it is missing. + */ +public class SynchronizedCache<K, V> implements Cache<K, V> { + private final Cache<K, V> underlying; + + public SynchronizedCache(Cache<K, V> underlying) { + this.underlying = underlying; + } + + @Override + public synchronized V get(K key) { + return underlying.get(key); + } + + @Override + public synchronized void put(K key, V value) { + underlying.put(key, value); + } + + @Override + public synchronized boolean remove(K key) { + return underlying.remove(key); + } + + @Override + public synchronized long size() { + return underlying.size(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java b/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java new file mode 100644 index 0000000..4cf130c --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.common.cache; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class LRUCacheTest { + + @Test + public void testPutGet() { + Cache<String, String> cache = new LRUCache<>(4); + + cache.put("a", "b"); + cache.put("c", "d"); + cache.put("e", "f"); + cache.put("g", "h"); + + assertEquals(4, cache.size()); + + assertEquals("b", cache.get("a")); + assertEquals("d", cache.get("c")); + assertEquals("f", cache.get("e")); + assertEquals("h", cache.get("g")); + } + + @Test + public void testRemove() { + Cache<String, String> cache = new LRUCache<>(4); + + cache.put("a", "b"); + cache.put("c", "d"); + cache.put("e", "f"); + assertEquals(3, cache.size()); + + assertEquals(true, cache.remove("a")); + assertEquals(2, cache.size()); + assertNull(cache.get("a")); + assertEquals("d", cache.get("c")); + assertEquals("f", cache.get("e")); + + assertEquals(false, cache.remove("key-does-not-exist")); + + assertEquals(true, cache.remove("c")); + assertEquals(1, cache.size()); + assertNull(cache.get("c")); + assertEquals("f", cache.get("e")); + + assertEquals(true, cache.remove("e")); + assertEquals(0, cache.size()); + assertNull(cache.get("e")); + } + + @Test + public void testEviction() { + Cache<String, String> cache = new LRUCache<>(2); + + cache.put("a", "b"); + cache.put("c", "d"); + assertEquals(2, cache.size()); + + cache.put("e", "f"); + assertEquals(2, cache.size()); + assertNull(cache.get("a")); + assertEquals("d", cache.get("c")); + assertEquals("f", cache.get("e")); + + // Validate correct access order eviction + cache.get("c"); + cache.put("g", "h"); + assertEquals(2, cache.size()); + assertNull(cache.get("e")); + assertEquals("d", cache.get("c")); + assertEquals("h", cache.get("g")); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java index 1841640..5b37f27 100644 --- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java +++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java @@ -21,6 +21,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.copycat.data.*; import org.apache.kafka.copycat.errors.DataException; @@ -28,7 +31,11 @@ import org.apache.kafka.copycat.storage.Converter; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; /** * Implementation of Converter that uses JSON to store schemas and objects. @@ -36,6 +43,8 @@ import java.util.*; public class JsonConverter implements Converter { private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable"; private static final boolean SCHEMAS_ENABLE_DEFAULT = true; + private static final String SCHEMAS_CACHE_CONFIG = "schemas.cache.size"; + private static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000; private static final HashMap<Schema.Type, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS = new HashMap<>(); @@ -188,6 +197,9 @@ public class JsonConverter implements Converter { } private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT; + private int cacheSize = SCHEMAS_CACHE_SIZE_DEFAULT; + private Cache<Schema, ObjectNode> fromCopycatSchemaCache; + private Cache<JsonNode, Schema> toCopycatSchemaCache; private final JsonSerializer serializer = new JsonSerializer(); private final JsonDeserializer deserializer = new JsonDeserializer(); @@ -200,6 +212,12 @@ public class JsonConverter implements Converter { serializer.configure(configs, isKey); deserializer.configure(configs, isKey); + + Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_DEFAULT); + if (cacheSizeVal != null) + cacheSize = (int) cacheSizeVal; + fromCopycatSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, ObjectNode>(cacheSize)); + toCopycatSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, Schema>(cacheSize)); } @Override @@ -247,10 +265,14 @@ public class JsonConverter implements Converter { return new SchemaAndValue(schema, convertToCopycat(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))); } - private static ObjectNode asJsonSchema(Schema schema) { + private ObjectNode asJsonSchema(Schema schema) { if (schema == null) return null; + ObjectNode cached = fromCopycatSchemaCache.get(schema); + if (cached != null) + return cached; + final ObjectNode jsonSchema; switch (schema.type()) { case BOOLEAN: @@ -313,14 +335,19 @@ public class JsonConverter implements Converter { if (schema.defaultValue() != null) jsonSchema.set(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, convertToJson(schema, schema.defaultValue())); + fromCopycatSchemaCache.put(schema, jsonSchema); return jsonSchema; } - private static Schema asCopycatSchema(JsonNode jsonSchema) { + private Schema asCopycatSchema(JsonNode jsonSchema) { if (jsonSchema.isNull()) return null; + Schema cached = toCopycatSchemaCache.get(jsonSchema); + if (cached != null) + return cached; + JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME); if (schemaTypeNode == null || !schemaTypeNode.isTextual()) throw new DataException("Schema must contain 'type' field"); @@ -409,7 +436,9 @@ public class JsonConverter implements Converter { if (schemaDefaultNode != null) builder.defaultValue(convertToCopycat(builder, schemaDefaultNode)); - return builder.build(); + Schema result = builder.build(); + toCopycatSchemaCache.put(jsonSchema, result); + return result; } @@ -420,11 +449,11 @@ public class JsonConverter implements Converter { * @param value the value * @return JsonNode-encoded version */ - private static JsonNode convertToJsonWithEnvelope(Schema schema, Object value) { + private JsonNode convertToJsonWithEnvelope(Schema schema, Object value) { return new JsonSchema.Envelope(asJsonSchema(schema), convertToJson(schema, value)).toJsonNode(); } - private static JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) { + private JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) { return convertToJson(schema, value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java index 214f9ce..96f8544 100644 --- a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java +++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java @@ -21,12 +21,16 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.common.cache.Cache; import org.apache.kafka.copycat.data.Schema; import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.data.SchemaBuilder; import org.apache.kafka.copycat.data.Struct; import org.apache.kafka.copycat.errors.DataException; +import org.junit.Before; import org.junit.Test; +import org.powermock.reflect.Whitebox; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -44,6 +48,11 @@ public class JsonConverterTest { ObjectMapper objectMapper = new ObjectMapper(); JsonConverter converter = new JsonConverter(); + @Before + public void setUp() { + converter.configure(Collections.EMPTY_MAP, false); + } + // Schema metadata @Test @@ -206,6 +215,27 @@ public class JsonConverterTest { assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); } + + @Test + public void testCacheSchemaToCopycatConversion() { + Cache<JsonNode, Schema> cache = Whitebox.getInternalState(converter, "toCopycatSchemaCache"); + assertEquals(0, cache.size()); + + converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()); + assertEquals(1, cache.size()); + + converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()); + assertEquals(1, cache.size()); + + // Different schema should also get cached + converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": true }".getBytes()); + assertEquals(2, cache.size()); + + // Even equivalent, but different JSON encoding of schema, should get different cache entry + converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false }, \"payload\": true }".getBytes()); + assertEquals(3, cache.size()); + } + // Schema types @Test @@ -428,6 +458,23 @@ public class JsonConverterTest { assertEquals(true, converted.booleanValue()); } + @Test + public void testCacheSchemaToJsonConversion() { + Cache<Schema, ObjectNode> cache = Whitebox.getInternalState(converter, "fromCopycatSchemaCache"); + assertEquals(0, cache.size()); + + // Repeated conversion of the same schema, even if the schema object is different should return the same Java + // object + converter.fromCopycatData(TOPIC, SchemaBuilder.bool().build(), true); + assertEquals(1, cache.size()); + + converter.fromCopycatData(TOPIC, SchemaBuilder.bool().build(), true); + assertEquals(1, cache.size()); + + // Validate that a similar, but different schema correctly returns a different schema. + converter.fromCopycatData(TOPIC, SchemaBuilder.bool().optional().build(), true); + assertEquals(2, cache.size()); + } private JsonNode parse(byte[] json) {
