This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c8f8757 Emitting numPersistentStores instead of num stores with
changelog from DiagnosticsManager (#1145)
c8f8757 is described below
commit c8f875740b25b8edaf25d17c43cd5d409f278215
Author: rmatharu <[email protected]>
AuthorDate: Mon Aug 26 11:54:46 2019 -0700
Emitting numPersistentStores instead of num stores with changelog from
DiagnosticsManager (#1145)
* Emitting numPersistentStores instead of num stores with changelog
---
.../java/org/apache/samza/config/StorageConfig.java | 13 +++++++++----
.../java/org/apache/samza/util/DiagnosticsUtil.java | 2 +-
.../apache/samza/diagnostics/DiagnosticsManager.java | 12 ++++++------
.../samza/diagnostics/DiagnosticsStreamMessage.java | 17 +++++++++--------
.../samza/diagnostics/TestDiagnosticsManager.java | 10 +++++-----
.../samza/diagnostics/TestDiagnosticsStreamMessage.java | 6 +++---
6 files changed, 33 insertions(+), 27 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 0bb9b99..7bc6cb4 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -63,6 +63,8 @@ public class StorageConfig extends MapConfig {
static final String SIDE_INPUTS_PROCESSOR_FACTORY = STORE_PREFIX +
"%s.side.inputs.processor.factory";
static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE =
STORE_PREFIX + "%s.side.inputs.processor.serialized.instance";
+ static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
public StorageConfig(Config config) {
super(config);
@@ -225,10 +227,13 @@ public class StorageConfig extends MapConfig {
}
/**
- * Helper method to get the number of stores configured with a changelog.
+ * Helper method to get the number of persistent stores.
*/
- public int getNumStoresWithChangelog() {
- Config subConfig = subset(STORE_PREFIX, true);
- return new Long(subConfig.keySet().stream().filter(key ->
key.endsWith(CHANGELOG_SUFFIX)).count()).intValue();
+ public int getNumPersistentStores() {
+ return (int) getStoreNames().stream()
+ .map(storeName -> getStorageFactoryClassName(storeName))
+ .filter(factoryName -> factoryName.isPresent())
+ .filter(factoryName ->
!factoryName.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY))
+ .count();
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index a3245a1..2870153 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -132,7 +132,7 @@ public class DiagnosticsUtil {
systemFactory.getProducer(diagnosticsSystemStream.getSystem(),
config, new MetricsRegistryMap());
DiagnosticsManager diagnosticsManager =
new DiagnosticsManager(jobName, jobId, jobModel.getContainers(),
containerMemoryMb, containerNumCores,
- new StorageConfig(config).getNumStoresWithChangelog(),
maxHeapSizeBytes, containerThreadPoolSize, containerId,
execEnvContainerId.orElse(""),
+ new StorageConfig(config).getNumPersistentStores(),
maxHeapSizeBytes, containerThreadPoolSize, containerId,
execEnvContainerId.orElse(""),
taskClassVersion, samzaVersion, hostName,
diagnosticsSystemStream, systemProducer,
Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
diff --git
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index ed5179e..80ddf0d 100644
---
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -67,7 +67,7 @@ public class DiagnosticsManager {
// Job-related params
private final int containerMemoryMb;
private final int containerNumCores;
- private final int numStoresWithChangelog;
+ private final int numPersistentStores;
private final long maxHeapSizeBytes;
private final int containerThreadPoolSize;
private final Map<String, ContainerModel> containerModels;
@@ -86,7 +86,7 @@ public class DiagnosticsManager {
Map<String, ContainerModel> containerModels,
int containerMemoryMb,
int containerNumCores,
- int numStoresWithChangelog,
+ int numPersistentStores,
long maxHeapSizeBytes,
int containerThreadPoolSize,
String containerId,
@@ -98,7 +98,7 @@ public class DiagnosticsManager {
SystemProducer systemProducer,
Duration terminationDuration) {
- this(jobName, jobId, containerModels, containerMemoryMb,
containerNumCores, numStoresWithChangelog, maxHeapSizeBytes,
containerThreadPoolSize,
+ this(jobName, jobId, containerModels, containerMemoryMb,
containerNumCores, numPersistentStores, maxHeapSizeBytes,
containerThreadPoolSize,
containerId, executionEnvContainerId, taskClassVersion, samzaVersion,
hostname, diagnosticSystemStream, systemProducer,
terminationDuration, Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()));
@@ -110,7 +110,7 @@ public class DiagnosticsManager {
Map<String, ContainerModel> containerModels,
int containerMemoryMb,
int containerNumCores,
- int numStoresWithChangelog,
+ int numPersistentStores,
long maxHeapSizeBytes,
int containerThreadPoolSize,
String containerId,
@@ -127,7 +127,7 @@ public class DiagnosticsManager {
this.containerModels = containerModels;
this.containerMemoryMb = containerMemoryMb;
this.containerNumCores = containerNumCores;
- this.numStoresWithChangelog = numStoresWithChangelog;
+ this.numPersistentStores = numPersistentStores;
this.maxHeapSizeBytes = maxHeapSizeBytes;
this.containerThreadPoolSize = containerThreadPoolSize;
this.containerId = containerId;
@@ -211,7 +211,7 @@ public class DiagnosticsManager {
if (!jobParamsEmitted) {
diagnosticsStreamMessage.addContainerMb(containerMemoryMb);
diagnosticsStreamMessage.addContainerNumCores(containerNumCores);
-
diagnosticsStreamMessage.addNumStoresWithChangelog(numStoresWithChangelog);
+ diagnosticsStreamMessage.addNumPersistentStores(numPersistentStores);
diagnosticsStreamMessage.addContainerModels(containerModels);
diagnosticsStreamMessage.addMaxHeapSize(maxHeapSizeBytes);
diagnosticsStreamMessage.addContainerThreadPoolSize(containerThreadPoolSize);
diff --git
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
index 81642d5..de12bde 100644
---
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
+++
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -55,7 +55,7 @@ public class DiagnosticsStreamMessage {
private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents";
private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb";
private static final String CONTAINER_NUM_CORES_METRIC_NAME =
"containerNumCores";
- private static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME
= "numStoresWithChangelog";
+ private static final String CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME =
"numPersistentStores";
private static final String CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME =
"maxHeap";
private static final String CONTAINER_THREAD_POOL_SIZE_METRIC_NAME =
"containerThreadPoolSize";
private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
@@ -92,11 +92,11 @@ public class DiagnosticsStreamMessage {
/**
* Add the num stores with changelog parameter to the message.
- * @param numStoresWithChangelog the parameter value.
+ * @param numPersistentStores the parameter value.
*/
- public void addNumStoresWithChangelog(Integer numStoresWithChangelog) {
- addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME,
- numStoresWithChangelog);
+ public void addNumPersistentStores(Integer numPersistentStores) {
+ addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME,
+ numPersistentStores);
}
/**
@@ -198,9 +198,9 @@ public class DiagnosticsStreamMessage {
return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_NUM_CORES_METRIC_NAME);
}
- public Integer getNumStoresWithChangelog() {
+ public Integer getNumPersistentStores() {
return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
- CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME);
+ CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME);
}
public Long getMaxHeapSize() {
@@ -234,7 +234,8 @@ public class DiagnosticsStreamMessage {
diagnosticsStreamMessage.addContainerNumCores((Integer)
diagnosticsManagerGroupMap.get(CONTAINER_NUM_CORES_METRIC_NAME));
diagnosticsStreamMessage.addContainerMb((Integer)
diagnosticsManagerGroupMap.get(CONTAINER_MB_METRIC_NAME));
- diagnosticsStreamMessage.addNumStoresWithChangelog((Integer)
diagnosticsManagerGroupMap.get(CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME));
+ diagnosticsStreamMessage.addNumPersistentStores((Integer)
diagnosticsManagerGroupMap.get(
+ CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME));
diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap((String)
diagnosticsManagerGroupMap.get(CONTAINER_MODELS_METRIC_NAME)));
diagnosticsStreamMessage.addMaxHeapSize((Long)
diagnosticsManagerGroupMap.get(CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME));
diagnosticsStreamMessage.addContainerThreadPoolSize((Integer)
diagnosticsManagerGroupMap.get(CONTAINER_THREAD_POOL_SIZE_METRIC_NAME));
diff --git
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
index 33d16e3..d1acdd2 100644
---
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
+++
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -55,7 +55,7 @@ public class TestDiagnosticsManager {
private int containerMb = 1024;
private int containerThreadPoolSize = 2;
private long maxHeapSize = 900;
- private int numStoresWithChangelog = 2;
+ private int numPersistentStores = 2;
private int containerNumCores = 2;
private Map<String, ContainerModel> containerModels =
TestDiagnosticsStreamMessage.getSampleContainerModels();
private Collection<DiagnosticsExceptionEvent> exceptionEventList =
TestDiagnosticsStreamMessage.getExceptionList();
@@ -75,7 +75,7 @@ public class TestDiagnosticsManager {
});
this.diagnosticsManager =
- new DiagnosticsManager(jobName, jobId, containerModels, containerMb,
containerNumCores, numStoresWithChangelog, maxHeapSize, containerThreadPoolSize,
+ new DiagnosticsManager(jobName, jobId, containerModels, containerMb,
containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize,
"0", executionEnvContainerId, taskClassVersion, samzaVersion,
hostname, diagnosticsSystemStream,
mockSystemProducer, Duration.ofSeconds(1), mockExecutorService);
@@ -136,7 +136,7 @@ public class TestDiagnosticsManager {
Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId,
hostname, 102)));
Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
- Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog());
+ Assert.assertNull(diagnosticsStreamMessage.getNumPersistentStores());
}
@Test
@@ -169,7 +169,7 @@ public class TestDiagnosticsManager {
Assert.assertNull(diagnosticsStreamMessage.getProcessorStopEvents());
Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
- Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog());
+ Assert.assertNull(diagnosticsStreamMessage.getNumPersistentStores());
}
@After
@@ -210,7 +210,7 @@ public class TestDiagnosticsManager {
Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(),
Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname,
101)));
Assert.assertEquals(containerModels,
diagnosticsStreamMessage.getContainerModels());
Assert.assertEquals(containerNumCores,
diagnosticsStreamMessage.getContainerNumCores().intValue());
- Assert.assertEquals(numStoresWithChangelog,
diagnosticsStreamMessage.getNumStoresWithChangelog().intValue());
+ Assert.assertEquals(numPersistentStores,
diagnosticsStreamMessage.getNumPersistentStores().intValue());
}
private class MockSystemProducer implements SystemProducer {
diff --git
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
index 81bc577..cd506b2 100644
---
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
+++
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
@@ -54,7 +54,7 @@ public class TestDiagnosticsStreamMessage {
diagnosticsStreamMessage.addContainerMb(1024);
diagnosticsStreamMessage.addContainerNumCores(2);
- diagnosticsStreamMessage.addNumStoresWithChangelog(3);
+ diagnosticsStreamMessage.addNumPersistentStores(3);
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
return diagnosticsStreamMessage;
@@ -106,7 +106,7 @@ public class TestDiagnosticsStreamMessage {
Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb());
Assert.assertEquals(2, (int)
diagnosticsStreamMessage.getContainerNumCores());
- Assert.assertEquals(3, (int)
diagnosticsStreamMessage.getNumStoresWithChangelog());
+ Assert.assertEquals(3, (int)
diagnosticsStreamMessage.getNumPersistentStores());
Assert.assertEquals(exceptionEventList,
diagnosticsStreamMessage.getExceptionEvents());
Assert.assertEquals(getSampleContainerModels(),
diagnosticsStreamMessage.getContainerModels());
Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(),
getProcessorStopEventList());
@@ -135,7 +135,7 @@ public class TestDiagnosticsStreamMessage {
Map<String, Map<String, Object>> metricsMap =
metricsSnapshot.getMetrics().getAsMap();
Assert.assertTrue(metricsMap.get("org.apache.samza.container.SamzaContainerMetrics").containsKey("exceptions"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerModels"));
-
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numStoresWithChangelog"));
+
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numPersistentStores"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb"));
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents"));