Repository: samza Updated Branches: refs/heads/master e25e0dab9 -> 5069f1ddb
http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java new file mode 100644 index 0000000..1f2d586 --- /dev/null +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.MetricsConfig; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.Context; +import org.apache.samza.context.JobContext; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.ReadableTable; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +public class TestLocalReadableTable { + + public static final String TABLE_ID = "t1"; + + private List<String> keys; + private Map<String, String> values; + + private Timer getNs; + private Timer getAllNs; + private Counter numGets; + private Counter numGetAlls; + private Timer getCallbackNs; + private Counter numMissedLookups; + + private MetricsRegistry metricsRegistry; + + private KeyValueStore kvStore; + + @Before + public void setUp() { + keys = Arrays.asList("k1", "k2", "k3"); + + values = new HashMap<>(); + values.put("k1", "v1"); + values.put("k2", "v2"); + values.put("k3", null); + + kvStore = mock(KeyValueStore.class); + when(kvStore.get("k1")).thenReturn("v1"); + when(kvStore.get("k2")).thenReturn("v2"); + when(kvStore.getAll(keys)).thenReturn(values); + + getNs = new Timer(""); + getAllNs = new Timer(""); + numGets = new Counter(""); + numGetAlls = new Counter(""); + getCallbackNs = new Timer(""); + numMissedLookups = new Counter(""); + + metricsRegistry = mock(MetricsRegistry.class); + String groupName = LocalReadableTable.class.getSimpleName(); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs); + } + + @Test + public void testGet() throws Exception { + ReadableTable table = createTable(false); + Assert.assertEquals("v1", table.get("k1")); + Assert.assertEquals("v2", table.getAsync("k2").get()); + Assert.assertNull(table.get("k3")); + verify(kvStore, times(3)).get(any()); + Assert.assertEquals(3, numGets.getCount()); + Assert.assertEquals(1, numMissedLookups.getCount()); + Assert.assertTrue(getNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, numGetAlls.getCount()); + Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testGetAll() throws Exception { + ReadableTable table = createTable(false); + Assert.assertEquals(values, table.getAll(keys)); + Assert.assertEquals(values, table.getAllAsync(keys).get()); + verify(kvStore, times(2)).getAll(any()); + Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList())); + Assert.assertEquals(2, numMissedLookups.getCount()); + Assert.assertEquals(3, numGetAlls.getCount()); + Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, numGets.getCount()); + Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testTimerDisabled() throws Exception { + ReadableTable table = createTable(true); + table.get(""); + table.getAsync("").get(); + table.getAll(Collections.emptyList()); + table.getAllAsync(Collections.emptyList()).get(); + verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString()); + verify(metricsRegistry, times(0)).newTimer(anyString(), anyString()); + verify(metricsRegistry, times(0)).newGauge(anyString(), any()); + Assert.assertEquals(2, numGets.getCount()); + Assert.assertEquals(2, numMissedLookups.getCount()); + Assert.assertEquals(2, numGetAlls.getCount()); + Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001); + } + + private LocalReadableTable createTable(boolean isTimerDisabled) { + Map<String, String> config = new HashMap<>(); + if (isTimerDisabled) { + config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); + } + Context context = mock(Context.class); + JobContext jobContext = mock(JobContext.class); + when(context.getJobContext()).thenReturn(jobContext); + when(jobContext.getConfig()).thenReturn(new MapConfig(config)); + ContainerContext containerContext = mock(ContainerContext.class); + when(context.getContainerContext()).thenReturn(containerContext); + when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry); + + LocalReadableTable table = new LocalReadableTable("t1", kvStore); + table.init(context); + + return table; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java new file mode 100644 index 0000000..5367931 --- /dev/null +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv; + +import junit.framework.Assert; +import org.apache.samza.config.MapConfig; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.Context; +import org.apache.samza.context.JobContext; +import org.apache.samza.context.TaskContext; +import org.apache.samza.table.TableProvider; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + + +public class TestLocalTableProvider { + + @Test + public void testInit() { + Context context = mock(Context.class); + JobContext jobContext = mock(JobContext.class); + when(context.getJobContext()).thenReturn(jobContext); + when(jobContext.getConfig()).thenReturn(new MapConfig()); + ContainerContext containerContext = mock(ContainerContext.class); + when(context.getContainerContext()).thenReturn(containerContext); + when(containerContext.getContainerMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); + TaskContext taskContext = mock(TaskContext.class); + when(context.getTaskContext()).thenReturn(taskContext); + when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class)); + + TableProvider tableProvider = createTableProvider("t1"); + tableProvider.init(context); + Assert.assertNotNull(tableProvider.getTable()); + } + + @Test(expected = NullPointerException.class) + public void testInitFail() { + TableProvider tableProvider = createTableProvider("t1"); + Assert.assertNotNull(tableProvider.getTable()); + } + + private TableProvider createTableProvider(String tableId) { + return new LocalTableProvider(tableId) { + }; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java deleted file mode 100644 index 752b91e..0000000 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.storage.kv.descriptors; - -import junit.framework.Assert; -import org.apache.samza.context.Context; -import org.apache.samza.context.TaskContext; -import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.storage.kv.LocalTableProvider; -import org.apache.samza.table.TableProvider; -import org.apache.samza.util.NoOpMetricsRegistry; -import org.junit.Test; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; - - -public class TestLocalTableProvider { - - @Test - public void testInit() { - Context context = mock(Context.class); - TaskContext taskContext = mock(TaskContext.class); - when(context.getTaskContext()).thenReturn(taskContext); - when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class)); - when(taskContext.getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); - - TableProvider tableProvider = createTableProvider("t1"); - tableProvider.init(context); - Assert.assertNotNull(tableProvider.getTable()); - } - - @Test(expected = NullPointerException.class) - public void testInitFail() { - TableProvider tableProvider = createTableProvider("t1"); - Assert.assertNotNull(tableProvider.getTable()); - } - - private TableProvider createTableProvider(String tableId) { - return new LocalTableProvider(tableId) { - }; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java index 80cb789..89a32d8 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java @@ -21,11 +21,9 @@ package org.apache.samza.sql.impl; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; -import org.apache.samza.sql.SamzaSqlRelRecord; import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory; import org.apache.samza.table.descriptors.TableDescriptor; -import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOResolverFactory; http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index fa279f2..e112804 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -20,11 +20,11 @@ package org.apache.samza.test.table; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; + import org.apache.samza.application.StreamApplication; import org.apache.samza.application.TaskApplication; import org.apache.samza.application.descriptors.TaskApplicationDescriptor; @@ -34,12 +34,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; import org.apache.samza.context.Context; -import org.apache.samza.context.TaskContext; import org.apache.samza.system.descriptors.GenericInputDescriptor; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.Gauge; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.metrics.Timer; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.system.descriptors.DelegatingSystemDescriptor; @@ -49,9 +44,6 @@ import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.storage.kv.LocalReadWriteTable; import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.table.ReadWriteTable; @@ -65,6 +57,7 @@ import org.apache.samza.task.TaskCoordinator; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.apache.samza.test.util.ArraySystemFactory; import org.apache.samza.test.util.Base64Serializer; + import org.junit.Assert; import org.junit.Test; @@ -74,16 +67,9 @@ import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde; import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory; import static org.apache.samza.test.table.TestTableData.Profile; import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyList; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** @@ -357,50 +343,6 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { } @Test - public void testAsyncOperation() throws Exception { - KeyValueStore kvStore = mock(KeyValueStore.class); - LocalReadWriteTable<String, String> table = new LocalReadWriteTable<>("table1", kvStore); - Context context = mock(Context.class); - TaskContext taskContext = mock(TaskContext.class); - when(context.getTaskContext()).thenReturn(taskContext); - MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString()); - doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString()); - doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any()); - doReturn(metricsRegistry).when(taskContext).getTaskMetricsRegistry(); - - table.init(context); - - // GET - doReturn("bar").when(kvStore).get(anyString()); - Assert.assertEquals("bar", table.getAsync("foo").get()); - - // GET-ALL - Map<String, String> recordMap = new HashMap<>(); - recordMap.put("foo1", "bar1"); - recordMap.put("foo2", "bar2"); - doReturn(recordMap).when(kvStore).getAll(anyList()); - Assert.assertEquals(recordMap, table.getAllAsync(Arrays.asList("foo1", "foo2")).get()); - - // PUT - table.putAsync("foo1", "bar1").get(); - verify(kvStore, times(1)).put(anyString(), anyString()); - - // PUT-ALL - List<Entry<String, String>> records = Arrays.asList(new Entry<>("foo1", "bar1"), new Entry<>("foo2", "bar2")); - table.putAllAsync(records).get(); - verify(kvStore, times(1)).putAll(anyList()); - - // DELETE - table.deleteAsync("foo").get(); - verify(kvStore, times(1)).delete(anyString()); - - // DELETE-ALL - table.deleteAllAsync(Arrays.asList("foo1", "foo2")).get(); - verify(kvStore, times(1)).deleteAll(anyList()); - } - - @Test public void testWithLowLevelApi() throws Exception { Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect()); http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index 8218b8b..c9228af 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -20,6 +20,7 @@ package org.apache.samza.test.table; import com.google.common.cache.CacheBuilder; + import java.io.IOException; import java.io.ObjectInputStream; import java.time.Duration; @@ -33,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; + import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.descriptors.StreamApplicationDescriptor; @@ -60,6 +62,7 @@ import org.apache.samza.table.remote.TableWriteFunction; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.apache.samza.test.util.Base64Serializer; import org.apache.samza.util.RateLimiter; + import org.junit.Assert; import org.junit.Test; @@ -68,6 +71,7 @@ import static org.apache.samza.test.table.TestTableData.PageView; import static org.apache.samza.test.table.TestTableData.Profile; import static org.apache.samza.test.table.TestTableData.generatePageViews; import static org.apache.samza.test.table.TestTableData.generateProfiles; + import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; @@ -248,7 +252,8 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString()); doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString()); Context context = new MockContext(); - doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry(); + doReturn(new MapConfig()).when(context.getJobContext()).getConfig(); + doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry(); return context; }
