This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new fb76dd1  STORM-3408 - update rocks version (#3024)
fb76dd1 is described below

commit fb76dd1c7dc39c4979f9cc921cf4a5930dfaa760
Author: Simon Cooper <thec...@runbox.com>
AuthorDate: Tue Jun 25 18:39:43 2019 +0100

    STORM-3408 - update rocks version (#3024)
    
    * STORM-3408 - update rocks version to 5.18.3
---
 pom.xml                                            |   2 +-
 .../metricstore/rocksdb/RocksDbMetricsWriter.java  |  20 +-
 .../storm/metricstore/rocksdb/RocksDbStore.java    | 219 +++++++++++----------
 3 files changed, 135 insertions(+), 106 deletions(-)

diff --git a/pom.xml b/pom.xml
index 989fedb..5603831 100644
--- a/pom.xml
+++ b/pom.xml
@@ -329,7 +329,7 @@
         <jool.version>0.9.12</jool.version>
         <caffeine.version>2.3.5</caffeine.version>
         <jaxb-version>2.3.0</jaxb-version>
-        <rocksdb-version>5.8.6</rocksdb-version>
+        <rocksdb-version>5.18.3</rocksdb-version>
 
         <!-- see intellij profile below... This fixes an annoyance with 
intellij -->
         <provided.scope>provided</provided.scope>
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
index 5ae8a0e..fae3865 100644
--- 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
@@ -182,7 +182,12 @@ public class RocksDbMetricsWriter implements Runnable, 
AutoCloseable {
         }
 
         // attempt to find the string in the database
-        stringMetadata = store.rocksDbGetStringMetadata(type, s);
+        try {
+            stringMetadata = store.rocksDbGetStringMetadata(type, s);
+        }
+        catch (RocksDBException e) {
+            throw new MetricException("Error reading metrics data", e);
+        }
         if (stringMetadata != null) {
             // update to the latest timestamp and add to the string cache
             stringMetadata.update(metricTimestamp, type);
@@ -234,10 +239,15 @@ public class RocksDbMetricsWriter implements Runnable, 
AutoCloseable {
             // now scan all metadata and remove any matching string Ids from 
this list
             RocksDbKey firstPrefix = 
RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
             RocksDbKey lastPrefix = 
RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
-            store.scanRange(firstPrefix, lastPrefix, (key, value) -> {
-                unusedIds.remove(key.getMetadataStringId());
-                return true; // process all metadata
-            });
+            try {
+                store.scanRange(firstPrefix, lastPrefix, (key, value) -> {
+                    unusedIds.remove(key.getMetadataStringId());
+                    return true; // process all metadata
+                });
+            }
+            catch (RocksDBException e) {
+                throw new MetricException("Error reading metrics data", e);
+            }
         }
     }
 
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
index 6b7617e..ba3f08b 100644
--- 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
@@ -274,7 +274,13 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
         }
 
         // attempt to find the string in the database
-        stringMetadata = rocksDbGetStringMetadata(type, s);
+        try {
+            stringMetadata = rocksDbGetStringMetadata(type, s);
+        }
+        catch (RocksDBException e) {
+            throw new MetricException("Error reading metric data", e);
+        }
+
         if (stringMetadata != null) {
             id = stringMetadata.getStringId();
 
@@ -290,7 +296,7 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
     }
 
     // scans the database to look for a metadata string and returns the 
metadata info
-    StringMetadata rocksDbGetStringMetadata(KeyType type, String s) {
+    StringMetadata rocksDbGetStringMetadata(KeyType type, String s) throws 
RocksDBException {
         RocksDbKey firstKey = RocksDbKey.getInitialKey(type);
         RocksDbKey lastKey = RocksDbKey.getLastKey(type);
         final AtomicReference<StringMetadata> reference = new 
AtomicReference<>();
@@ -306,20 +312,21 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
     }
 
     // scans from key start to the key before end, calling back until callback 
indicates not to process further
-    void scanRange(RocksDbKey start, RocksDbKey end, RocksDbScanCallback fn) {
+    void scanRange(RocksDbKey start, RocksDbKey end, RocksDbScanCallback fn) 
throws RocksDBException {
         try (ReadOptions ro = new ReadOptions()) {
             ro.setTotalOrderSeek(true);
-            RocksIterator iterator = db.newIterator(ro);
-            for (iterator.seek(start.getRaw()); iterator.isValid(); 
iterator.next()) {
-                RocksDbKey key = new RocksDbKey(iterator.key());
-                if (key.compareTo(end) >= 0) { // past limit, quit
-                    return;
-                }
+            try (RocksIterator iterator = db.newIterator(ro)) {
+                for (iterator.seek(start.getRaw()); iterator.isValid(); 
iterator.next()) {
+                    RocksDbKey key = new RocksDbKey(iterator.key());
+                    if (key.compareTo(end) >= 0) { // past limit, quit
+                        return;
+                    }
 
-                RocksDbValue val = new RocksDbValue(iterator.value());
-                if (!fn.cb(key, val)) {
-                    // if cb returns false, we are done with this section of 
rows
-                    return;
+                    RocksDbValue val = new RocksDbValue(iterator.value());
+                    if (!fn.cb(key, val)) {
+                        // if cb returns false, we are done with this section 
of rows
+                        return;
+                    }
                 }
             }
         }
@@ -448,88 +455,95 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
             endStreamId = streamId;
         }
 
-        ReadOptions ro = new ReadOptions();
-        ro.setTotalOrderSeek(true);
-
-        for (AggLevel aggLevel : filter.getAggLevels()) {
-
-            RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, 
startTopologyId, startTime, startMetricId,
-                                                             startComponentId, 
startExecutorId, startHostId, startPort, startStreamId);
-            RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, 
endTopologyId, endTime, endMetricId,
-                                                           endComponentId, 
endExecutorId, endHostId, endPort, endStreamId);
-
-            RocksIterator iterator = db.newIterator(ro);
-            for (iterator.seek(startKey.getRaw()); iterator.isValid(); 
iterator.next()) {
-                RocksDbKey key = new RocksDbKey(iterator.key());
-
-                if (key.compareTo(endKey) > 0) { // past limit, quit
-                    break;
-                }
-
-                if (startTopologyId != 0 && key.getTopologyId() != 
startTopologyId) {
-                    continue;
-                }
-
-                long timestamp = key.getTimestamp();
-                if (timestamp < startTime || timestamp > endTime) {
-                    continue;
-                }
-
-                if (startMetricId != 0 && key.getMetricId() != startMetricId) {
-                    continue;
-                }
-
-                if (startComponentId != 0 && key.getComponentId() != 
startComponentId) {
-                    continue;
-                }
-
-                if (startExecutorId != 0 && key.getExecutorId() != 
startExecutorId) {
-                    continue;
-                }
-
-                if (startHostId != 0 && key.getHostnameId() != startHostId) {
-                    continue;
-                }
-
-                if (startPort != 0 && key.getPort() != startPort) {
-                    continue;
-                }
-
-                if (startStreamId != 0 && key.getStreamId() != startStreamId) {
-                    continue;
-                }
-
-                RocksDbValue val = new RocksDbValue(iterator.value());
-
-                if (scanCallback != null) {
-                    try {
-                        // populate a metric
-                        String metricName = 
metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache);
-                        String topologyId = 
metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), 
idToStringCache);
-                        String componentId = 
metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), 
idToStringCache);
-                        String executorId = 
metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), 
idToStringCache);
-                        String hostname = 
metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache);
-                        String streamId = 
metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), 
idToStringCache);
-
-                        Metric metric = new Metric(metricName, timestamp, 
topologyId, 0.0, componentId, executorId, hostname,
-                                                   streamId, key.getPort(), 
aggLevel);
-
-                        val.populateMetric(metric);
+        try (ReadOptions ro = new ReadOptions()) {
+            ro.setTotalOrderSeek(true);
 
-                        // callback to caller
-                        scanCallback.cb(metric);
-                    } catch (MetricException e) {
-                        LOG.warn("Failed to report found metric: {}", 
e.getMessage());
-                    }
-                } else {
-                    if (!rawCallback.cb(key, val)) {
-                        return;
+            for (AggLevel aggLevel : filter.getAggLevels()) {
+
+                RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, 
startTopologyId, startTime, startMetricId,
+                        startComponentId, startExecutorId, startHostId, 
startPort, startStreamId);
+                RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, 
endTopologyId, endTime, endMetricId,
+                        endComponentId, endExecutorId, endHostId, endPort, 
endStreamId);
+
+                try (RocksIterator iterator = db.newIterator(ro)) {
+                    for (iterator.seek(startKey.getRaw()); iterator.isValid(); 
iterator.next()) {
+                        RocksDbKey key = new RocksDbKey(iterator.key());
+
+                        if (key.compareTo(endKey) > 0) { // past limit, quit
+                            break;
+                        }
+
+                        if (startTopologyId != 0 && key.getTopologyId() != 
startTopologyId) {
+                            continue;
+                        }
+
+                        long timestamp = key.getTimestamp();
+                        if (timestamp < startTime || timestamp > endTime) {
+                            continue;
+                        }
+
+                        if (startMetricId != 0 && key.getMetricId() != 
startMetricId) {
+                            continue;
+                        }
+
+                        if (startComponentId != 0 && key.getComponentId() != 
startComponentId) {
+                            continue;
+                        }
+
+                        if (startExecutorId != 0 && key.getExecutorId() != 
startExecutorId) {
+                            continue;
+                        }
+
+                        if (startHostId != 0 && key.getHostnameId() != 
startHostId) {
+                            continue;
+                        }
+
+                        if (startPort != 0 && key.getPort() != startPort) {
+                            continue;
+                        }
+
+                        if (startStreamId != 0 && key.getStreamId() != 
startStreamId) {
+                            continue;
+                        }
+
+                        RocksDbValue val = new RocksDbValue(iterator.value());
+
+                        if (scanCallback != null) {
+                            try {
+                                // populate a metric
+                                String metricName = 
metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache);
+                                String topologyId = 
metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), 
idToStringCache);
+                                String componentId = 
metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), 
idToStringCache);
+                                String executorId = 
metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), 
idToStringCache);
+                                String hostname = 
metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache);
+                                String streamId = 
metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), 
idToStringCache);
+
+                                Metric metric = new Metric(metricName, 
timestamp, topologyId, 0.0, componentId, executorId, hostname,
+                                        streamId, key.getPort(), aggLevel);
+
+                                val.populateMetric(metric);
+
+                                // callback to caller
+                                scanCallback.cb(metric);
+                            }
+                            catch (MetricException e) {
+                                LOG.warn("Failed to report found metric: {}", 
e.getMessage());
+                            }
+                        }
+                        else {
+                            try {
+                                if (!rawCallback.cb(key, val)) {
+                                    return;
+                                }
+                            }
+                            catch (RocksDBException e) {
+                                throw new MetricException("Error reading 
metrics data", e);
+                            }
+                        }
                     }
                 }
             }
-            iterator.close();
         }
-        ro.close();
     }
 
     // Finds the metadata string that matches the string Id and type provided. 
 The string should exist, as it is
@@ -568,7 +582,7 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
              WriteOptions writeOps = new WriteOptions()) {
 
             scanRaw(filter, (RocksDbKey key, RocksDbValue value) -> {
-                writeBatch.remove(key.getRaw());
+                writeBatch.delete(key.getRaw());
                 return true;
             });
 
@@ -603,15 +617,20 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
             // search all metadata strings
             RocksDbKey topologyMetadataPrefix = 
RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
             RocksDbKey lastPrefix = 
RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
-            scanRange(topologyMetadataPrefix, lastPrefix, (key, value) -> {
-                // we'll assume the metadata was recently used if still in the 
cache.
-                if 
(!readOnlyStringMetadataCache.contains(key.getMetadataStringId())) {
-                    if (value.getLastTimestamp() < firstValidTimestamp) {
-                        writeBatch.remove(key.getRaw());
+            try {
+                scanRange(topologyMetadataPrefix, lastPrefix, (key, value) -> {
+                    // we'll assume the metadata was recently used if still in 
the cache.
+                    if 
(!readOnlyStringMetadataCache.contains(key.getMetadataStringId())) {
+                        if (value.getLastTimestamp() < firstValidTimestamp) {
+                            writeBatch.delete(key.getRaw());
+                        }
                     }
-                }
-                return true;
-            });
+                    return true;
+                });
+            }
+            catch (RocksDBException e) {
+                throw new MetricException("Error reading metric data", e);
+            }
 
             if (writeBatch.count() > 0) {
                 LOG.info("Deleting {} metadata strings", writeBatch.count());
@@ -630,7 +649,7 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
     }
 
     interface RocksDbScanCallback {
-        boolean cb(RocksDbKey key, RocksDbValue val);  // return false to stop 
scan
+        boolean cb(RocksDbKey key, RocksDbValue val) throws RocksDBException;  
// return false to stop scan
     }
 }
 

Reply via email to