Repository: samza Updated Branches: refs/heads/master ff607cb6b -> b90ab50c3
SAMZA-1976: MetadataStore API cleanup. This PR consists of the following changes: * Switching all the API methods from using byte[] array as key type to string. * Fixed `CoordinatorMetadataStore`, `ZkMetadataStore` tests due to the type change of key. Shortly in a followup PR, namespace unification for different metadata stored in standalone and YARN model will be done. Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Prateek <[email protected]> Closes #791 from shanthoosh/metadata_store_api_cleanup Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b90ab50c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b90ab50c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b90ab50c Branch: refs/heads/master Commit: b90ab50c32d97a31313efdfb26b48c2fcdeb61da Parents: ff607cb Author: Shanthoosh Venkataraman <[email protected]> Authored: Thu Nov 29 09:46:13 2018 -0800 Committer: Srinivasulu Punuru <[email protected]> Committed: Thu Nov 29 09:46:13 2018 -0800 ---------------------------------------------------------------------- .../samza/metadatastore/MetadataStore.java | 8 +++--- .../apache/samza/container/LocalityManager.java | 6 ++--- .../grouper/task/TaskAssignmentManager.java | 9 +++---- .../metadatastore/CoordinatorStreamStore.java | 27 ++++++++++---------- .../org/apache/samza/zk/ZkMetadataStore.java | 17 ++++++------ .../TestCoordinatorStreamStore.java | 23 ++++++----------- .../apache/samza/zk/TestZkMetadataStore.java | 18 ++++++------- 7 files changed, 49 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java index cd04794..9009a65 100644 --- a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java +++ b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java @@ -39,7 +39,7 @@ public interface MetadataStore { * @param key the key with which the associated value is to be fetched. * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}. */ - byte[] get(byte[] key); + byte[] get(String key); /** * Updates the mapping of the specified key-value pair. @@ -47,21 +47,21 @@ public interface MetadataStore { * @param key the key with which the specified {@code value} is to be associated. * @param value the value with which the specified {@code key} is to be associated. */ - void put(byte[] key, byte[] value); + void put(String key, byte[] value); /** * Deletes the mapping for the specified {@code key} from this metadata store (if such mapping exists). * * @param key the key for which the mapping is to be deleted. */ - void delete(byte[] key); + void delete(String key); /** * Returns all the entries in this metadata store. * * @return all entries in this metadata store. */ - Map<byte[], byte[]> all(); + Map<String, byte[]> all(); /** * Flushes the metadata store, if applicable. http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java index 63483b7..fe076ee 100644 --- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java @@ -89,10 +89,10 @@ public class LocalityManager { */ public Map<String, Map<String, String>> readContainerLocality() { Map<String, Map<String, String>> allMappings = new HashMap<>(); - metadataStore.all().forEach((keyBytes, valueBytes) -> { + metadataStore.all().forEach((containerId, valueBytes) -> { if (valueBytes != null) { String locationId = valueSerde.fromBytes(valueBytes); - allMappings.put(keySerde.fromBytes(keyBytes), ImmutableMap.of(SetContainerHostMapping.HOST_KEY, locationId)); + allMappings.put(containerId, ImmutableMap.of(SetContainerHostMapping.HOST_KEY, locationId)); } }); if (LOG.isDebugEnabled()) { @@ -120,7 +120,7 @@ public class LocalityManager { LOG.info("Container {} started at {}", containerId, hostName); } - metadataStore.put(keySerde.toBytes(containerId), valueSerde.toBytes(hostName)); + metadataStore.put(containerId, valueSerde.toBytes(hostName)); } public void close() { http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java index 0ada91c..32bbf29 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java @@ -94,8 +94,7 @@ public class TaskAssignmentManager { */ public Map<String, String> readTaskAssignment() { taskNameToContainerId.clear(); - metadataStore.all().forEach((keyBytes, valueBytes) -> { - String taskName = keySerde.fromBytes(keyBytes); + metadataStore.all().forEach((taskName, valueBytes) -> { String containerId = valueSerde.fromBytes(valueBytes); if (containerId != null) { taskNameToContainerId.put(taskName, containerId); @@ -120,10 +119,10 @@ public class TaskAssignmentManager { } if (containerId == null) { - metadataStore.delete(keySerde.toBytes(taskName)); + metadataStore.delete(taskName); taskNameToContainerId.remove(taskName); } else { - metadataStore.put(keySerde.toBytes(taskName), valueSerde.toBytes(containerId)); + metadataStore.put(taskName, valueSerde.toBytes(containerId)); taskNameToContainerId.put(taskName, containerId); } } @@ -135,7 +134,7 @@ public class TaskAssignmentManager { */ public void deleteTaskContainerMappings(Iterable<String> taskNames) { for (String taskName : taskNames) { - metadataStore.delete(keySerde.toBytes(taskName)); + metadataStore.delete(taskName); taskNameToContainerId.remove(taskName); } } http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java index af5e2f9..899c87c 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java @@ -20,16 +20,15 @@ package org.apache.samza.coordinator.metadatastore; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; -import com.google.common.primitives.UnsignedBytes; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import java.util.Map; import java.util.HashMap; import java.util.List; import java.util.Objects; -import java.util.TreeMap; import org.apache.samza.Partition; import org.apache.samza.config.Config; +import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde; import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; import org.apache.samza.metadatastore.MetadataStore; import org.apache.samza.metrics.MetricsRegistry; @@ -67,10 +66,9 @@ public class CoordinatorStreamStore implements MetadataStore { private final SystemConsumer systemConsumer; private final SystemAdmin systemAdmin; private final String type; - private final Serde<List<?>> keySerde; + private final CoordinatorStreamKeySerde keySerde; - // Using custom comparator since java default comparator offers object identity equality(not value equality) for byte arrays. - private final Map<byte[], byte[]> bootstrappedMessages = new TreeMap<>(UnsignedBytes.lexicographicalComparator()); + private final Map<String, byte[]> bootstrappedMessages = new HashMap<>(); private final Object bootstrapLock = new Object(); private final AtomicBoolean isInitialized = new AtomicBoolean(false); @@ -79,7 +77,7 @@ public class CoordinatorStreamStore implements MetadataStore { public CoordinatorStreamStore(String namespace, Config config, MetricsRegistry metricsRegistry) { this.config = config; this.type = namespace; - this.keySerde = new JsonSerde<>(); + this.keySerde = new CoordinatorStreamKeySerde(type); this.coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config); this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0)); SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config); @@ -104,26 +102,26 @@ public class CoordinatorStreamStore implements MetadataStore { } @Override - public byte[] get(byte[] key) { + public byte[] get(String key) { bootstrapMessagesFromStream(); return bootstrappedMessages.get(key); } @Override - public void put(byte[] key, byte[] value) { - OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(coordinatorSystemStream, 0, key, value); + public void put(String key, byte[] value) { + OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(coordinatorSystemStream, 0, keySerde.toBytes(key), value); systemProducer.send(SOURCE, envelope); flush(); } @Override - public void delete(byte[] key) { + public void delete(String key) { // Since kafka doesn't support individual message deletion, store value as null for a key to delete. put(key, null); } @Override - public Map<byte[], byte[]> all() { + public Map<String, byte[]> all() { bootstrapMessagesFromStream(); return Collections.unmodifiableMap(bootstrappedMessages); } @@ -136,13 +134,14 @@ public class CoordinatorStreamStore implements MetadataStore { while (iterator.hasNext()) { IncomingMessageEnvelope envelope = iterator.next(); byte[] keyAsBytes = (byte[]) envelope.getKey(); - Object[] keyArray = keySerde.fromBytes(keyAsBytes).toArray(); + Serde<List<?>> serde = new JsonSerde<>(); + Object[] keyArray = serde.fromBytes(keyAsBytes).toArray(); CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, new HashMap<>()); if (Objects.equals(coordinatorStreamMessage.getType(), type)) { if (envelope.getMessage() != null) { - bootstrappedMessages.put(keyAsBytes, (byte[]) envelope.getMessage()); + bootstrappedMessages.put(coordinatorStreamMessage.getKey(), (byte[]) envelope.getMessage()); } else { - bootstrappedMessages.remove(keyAsBytes); + bootstrappedMessages.remove(coordinatorStreamMessage.getKey()); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java index 4cfdc8d..fa41df6 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java @@ -18,7 +18,6 @@ */ package org.apache.samza.zk; -import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; import java.util.HashMap; import java.util.List; @@ -64,7 +63,7 @@ public class ZkMetadataStore implements MetadataStore { * {@inheritDoc} */ @Override - public byte[] get(byte[] key) { + public byte[] get(String key) { return zkClient.readData(getZkPathForKey(key), true); } @@ -72,7 +71,7 @@ public class ZkMetadataStore implements MetadataStore { * {@inheritDoc} */ @Override - public void put(byte[] key, byte[] value) { + public void put(String key, byte[] value) { String zkPath = getZkPathForKey(key); zkClient.createPersistent(zkPath, true); zkClient.writeData(zkPath, value); @@ -82,7 +81,7 @@ public class ZkMetadataStore implements MetadataStore { * {@inheritDoc} */ @Override - public void delete(byte[] key) { + public void delete(String key) { zkClient.delete(getZkPathForKey(key)); } @@ -91,15 +90,15 @@ public class ZkMetadataStore implements MetadataStore { * @throws SamzaException if there're exceptions reading data from zookeeper. */ @Override - public Map<byte[], byte[]> all() { + public Map<String, byte[]> all() { try { List<String> zkSubDirectories = zkClient.getChildren(zkBaseDir); - Map<byte[], byte[]> result = new HashMap<>(); + Map<String, byte[]> result = new HashMap<>(); for (String zkSubDir : zkSubDirectories) { String completeZkPath = String.format("%s/%s", zkBaseDir, zkSubDir); byte[] value = zkClient.readData(completeZkPath, true); if (value != null) { - result.put(completeZkPath.getBytes("UTF-8"), value); + result.put(zkSubDir, value); } } return result; @@ -126,7 +125,7 @@ public class ZkMetadataStore implements MetadataStore { zkClient.close(); } - private String getZkPathForKey(byte[] key) { - return String.format("%s/%s", zkBaseDir, new String(key, Charset.forName("UTF-8"))); + private String getZkPathForKey(String key) { + return String.format("%s/%s", zkBaseDir, key); } } http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java index 0e48363..2f6f598 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java @@ -62,7 +62,7 @@ public class TestCoordinatorStreamStore { @Test public void testReadAfterWrite() { - byte[] key = getKey("test-key1"); + String key = "test-key1"; byte[] value = getValue("test-value1"); Assert.assertNull(coordinatorStreamStore.get(key)); coordinatorStreamStore.put(key, value); @@ -72,7 +72,7 @@ public class TestCoordinatorStreamStore { @Test public void testReadAfterDelete() { - byte[] key = getKey("test-key1"); + String key = "test-key1"; byte[] value = getValue("test-value1"); Assert.assertNull(coordinatorStreamStore.get(key)); coordinatorStreamStore.put(key, value); @@ -84,13 +84,13 @@ public class TestCoordinatorStreamStore { @Test public void testReadOfNonExistentKey() { - Assert.assertNull(coordinatorStreamStore.get("randomKey".getBytes())); + Assert.assertNull(coordinatorStreamStore.get("randomKey")); Assert.assertEquals(0, coordinatorStreamStore.all().size()); } @Test public void testMultipleUpdatesForSameKey() { - byte[] key = getKey("test-key1"); + String key = "test-key1"; byte[] value = getValue("test-value1"); byte[] value1 = getValue("test-value2"); coordinatorStreamStore.put(key, value); @@ -101,16 +101,16 @@ public class TestCoordinatorStreamStore { @Test public void testAllEntries() { - byte[] key = getKey("test-key1"); - byte[] key1 = getKey("test-key2"); - byte[] key2 = getKey("test-key3"); + String key = "test-key1"; + String key1 = "test-key2"; + String key2 = "test-key3"; byte[] value = getValue("test-value1"); byte[] value1 = getValue("test-value2"); byte[] value2 = getValue("test-value3"); coordinatorStreamStore.put(key, value); coordinatorStreamStore.put(key1, value1); coordinatorStreamStore.put(key2, value2); - ImmutableMap<byte[], byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2); + ImmutableMap<String, byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2); Assert.assertEquals(expected, coordinatorStreamStore.all()); } @@ -119,11 +119,4 @@ public class TestCoordinatorStreamStore { SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping("testSource", "testTask", value); return messageSerde.toBytes(setTaskContainerMapping.getMessageMap()); } - - private byte[] getKey(String key) { - JsonSerde<List<?>> keySerde = new JsonSerde<>(); - SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping("testSource", key, ""); - return keySerde.toBytes(Arrays.asList(setTaskContainerMapping.getKeyArray())); - } - } http://git-wip-us.apache.org/repos/asf/samza/blob/b90ab50c/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java index 5930c65..3d5f3b3 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java @@ -67,7 +67,7 @@ public class TestZkMetadataStore { @Test public void testReadAfterWrite() throws Exception { - byte[] key = "test-key1".getBytes("UTF-8"); + String key = "test-key1"; byte[] value = "test-value1".getBytes("UTF-8"); Assert.assertNull(zkMetadataStore.get(key)); zkMetadataStore.put(key, value); @@ -77,7 +77,7 @@ public class TestZkMetadataStore { @Test public void testReadAfterDelete() throws Exception { - byte[] key = "test-key1".getBytes("UTF-8"); + String key = "test-key1"; byte[] value = "test-value1".getBytes("UTF-8"); Assert.assertNull(zkMetadataStore.get(key)); zkMetadataStore.put(key, value); @@ -88,14 +88,14 @@ public class TestZkMetadataStore { } @Test - public void testReadOfNonExistentKey() throws Exception { - Assert.assertNull(zkMetadataStore.get("randomKey".getBytes("UTF-8"))); + public void testReadOfNonExistentKey() { + Assert.assertNull(zkMetadataStore.get("randomKey")); Assert.assertEquals(0, zkMetadataStore.all().size()); } @Test public void testMultipleUpdatesForSameKey() throws Exception { - byte[] key = "test-key1".getBytes("UTF-8"); + String key = "test-key1"; byte[] value = "test-value1".getBytes("UTF-8"); byte[] value1 = "test-value2".getBytes("UTF-8"); zkMetadataStore.put(key, value); @@ -106,16 +106,16 @@ public class TestZkMetadataStore { @Test public void testAllEntries() throws Exception { - byte[] key = "test-key1".getBytes("UTF-8"); - byte[] key1 = "test-key2".getBytes("UTF-8"); - byte[] key2 = "test-key3".getBytes("UTF-8"); + String key = "test-key1"; + String key1 = "test-key2"; + String key2 = "test-key3"; byte[] value = "test-value1".getBytes("UTF-8"); byte[] value1 = "test-value2".getBytes("UTF-8"); byte[] value2 = "test-value3".getBytes("UTF-8"); zkMetadataStore.put(key, value); zkMetadataStore.put(key1, value1); zkMetadataStore.put(key2, value2); - ImmutableMap<byte[], byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2); + ImmutableMap<String, byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2); Assert.assertEquals(expected.size(), zkMetadataStore.all().size()); } }
