Hi, >>Would it be an option to use the ScheduledExecutorService from org.apache.jackrabbit.oak.Oak?
In revision 1635436, I removed the custom executor service and used whiteboard instance to schedule in org.apache.jackrabbit.oak.Oak. Thanks Amit On Wed, Oct 29, 2014 at 10:10 PM, Marcel Reutegger <mreut...@adobe.com> wrote: > Hi, > > I think this commit causes OOME when the build is run with the node store > fixtures for MongoDB and RDB. > > The heap dump shows many DocumentNodeStore instances, which cannot be > garbage collected because the ScheduledExecutorService instance introduce > with this change is still running and references the store. > > Would it be an option to use the ScheduledExecutorService from > org.apache.jackrabbit.oak.Oak? > > Regards > Marcel > > On 29/10/14 07:17, "am...@apache.org" <am...@apache.org> wrote: > > >Author: amitj > >Date: Wed Oct 29 06:17:00 2014 > >New Revision: 1635058 > > > >URL: http://svn.apache.org/r1635058 > >Log: > >OAK-2230: Execution Stats for async indexing > >Added resettable consolidated execution stats > >TimeSeries for trends > > > >Added: > > > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat > >s/TimeSeriesStatsUtil.java (with props) > >Modified: > > > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/ > >jmx/IndexStatsMBean.java > > > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug > >ins/index/AsyncIndexUpdate.java > > > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat > >s/RepositoryStats.java > > > >Modified: > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/ > >jmx/IndexStatsMBean.java > >URL: > > > http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o > >rg/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1635058&r1=16350 > >57&r2=1635058&view=diff > >========================================================================== > >==== > >--- > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/ > >jmx/IndexStatsMBean.java (original) > >+++ > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/ > >jmx/IndexStatsMBean.java Wed Oct 29 06:17:00 2014 > >@@ -17,6 +17,8 @@ > > > > package org.apache.jackrabbit.oak.api.jmx; > > > >+import javax.management.openmbean.CompositeData; > >+ > > public interface IndexStatsMBean { > > > > String TYPE = "IndexStats"; > >@@ -114,4 +116,28 @@ public interface IndexStatsMBean { > > */ > > String getTemporaryCheckpoints(); > > > >+ /** > >+ * Returns the number of executions as a {@link > >org.apache.jackrabbit.api.stats.TimeSeries}. > >+ * > >+ * @return the execution count time series > >+ */ > >+ CompositeData getExecutionCount(); > >+ > >+ /** > >+ * Returns the execution time as a {@link > >org.apache.jackrabbit.api.stats.TimeSeries}. > >+ * > >+ * @return the execution times time series > >+ */ > >+ CompositeData getExecutionTime(); > >+ > >+ /** > >+ * Returns the consolidated execution stats since last reset > >+ * @return consolidated execution stats > >+ */ > >+ CompositeData getConsolidatedExecutionStats(); > >+ > >+ /** > >+ * Resets the consolidated stats. > >+ */ > >+ void resetConsolidatedExecutionStats(); > > } > > > >Modified: > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug > >ins/index/AsyncIndexUpdate.java > >URL: > > > http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o > >rg/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1635058&r > >1=1635057&r2=1635058&view=diff > >========================================================================== > >==== > >--- > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug > >ins/index/AsyncIndexUpdate.java (original) > >+++ > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug > >ins/index/AsyncIndexUpdate.java Wed Oct 29 06:17:00 2014 > >@@ -28,10 +28,22 @@ import static org.apache.jackrabbit.oak. > > import java.util.Calendar; > > import java.util.HashSet; > > import java.util.Set; > >+import java.util.concurrent.Executors; > >+import java.util.concurrent.ScheduledExecutorService; > >+import java.util.concurrent.ThreadFactory; > > import java.util.concurrent.TimeUnit; > >+import java.util.concurrent.atomic.AtomicInteger; > >+import java.util.concurrent.atomic.AtomicLong; > > > > import javax.annotation.Nonnull; > >+import javax.management.openmbean.CompositeData; > >+import javax.management.openmbean.CompositeDataSupport; > >+import javax.management.openmbean.CompositeType; > >+import javax.management.openmbean.OpenDataException; > >+import javax.management.openmbean.OpenType; > >+import javax.management.openmbean.SimpleType; > > > >+import com.google.common.base.Stopwatch; > > import org.apache.jackrabbit.oak.api.CommitFailedException; > > import org.apache.jackrabbit.oak.api.PropertyState; > > import org.apache.jackrabbit.oak.api.Type; > >@@ -51,6 +63,8 @@ import org.apache.jackrabbit.oak.spi.sta > > import org.apache.jackrabbit.oak.spi.state.NodeState; > > import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; > > import org.apache.jackrabbit.oak.spi.state.NodeStore; > >+import org.apache.jackrabbit.oak.stats.TimeSeriesStatsUtil; > >+import org.apache.jackrabbit.stats.TimeSeriesRecorder; > > import org.apache.jackrabbit.util.ISO8601; > > import org.slf4j.Logger; > > import org.slf4j.LoggerFactory; > >@@ -380,6 +394,7 @@ public class AsyncIndexUpdate implements > > reindexedDefinitions.clear(); > > } > > postAsyncRunStatsStatus(indexStats); > >+ indexStats.resetConsolidatedExecutionStats(); > > } > > mergeWithConcurrencyCheck(builder, beforeCheckpoint, > >callback.lease); > > } finally { > >@@ -443,17 +458,29 @@ public class AsyncIndexUpdate implements > > > > private volatile boolean isPaused; > > private volatile long updates; > >+ private Stopwatch watch = Stopwatch.createUnstarted(); > >+ private ExecutionStats execStats = new ExecutionStats(); > > > > public void start(String now) { > > status = STATUS_RUNNING; > > start = now; > > done = ""; > >+ > >+ if (watch.isRunning()) { > >+ watch.reset(); > >+ } > >+ watch.start(); > > } > > > > public void done(String now) { > > status = STATUS_DONE; > >- start = ""; > > done = now; > >+ if (watch.isRunning()) { > >+ watch.stop(); > >+ } > >+ execStats.incrementCounter(); > >+ > >execStats.recordExecution(watch.elapsed(TimeUnit.MILLISECONDS), updates); > >+ watch.reset(); > > } > > > > @Override > >@@ -534,6 +561,26 @@ public class AsyncIndexUpdate implements > > } > > > > @Override > >+ public CompositeData getExecutionCount() { > >+ return execStats.getExecutionCount(); > >+ } > >+ > >+ @Override > >+ public CompositeData getExecutionTime() { > >+ return execStats.getExecutionTime(); > >+ } > >+ > >+ @Override > >+ public CompositeData getConsolidatedExecutionStats() { > >+ return execStats.getConsolidatedStats(); > >+ } > >+ > >+ @Override > >+ public void resetConsolidatedExecutionStats() { > >+ execStats.resetConsolidatedStats(); > >+ } > >+ > >+ @Override > > public String toString() { > > return "AsyncIndexStats [start=" + start + ", done=" + done > > + ", status=" + status + ", paused=" + isPaused > >@@ -541,6 +588,86 @@ public class AsyncIndexUpdate implements > > + referenceCp + ", processedCheckpoint=" + > >processedCp > > + " ,tempCheckpoints=" + tempCps + " ]"; > > } > >+ > >+ class ExecutionStats { > >+ final ScheduledExecutorService executor; > >+ final TimeSeriesRecorder execCounter; > >+ final TimeSeriesRecorder execTimer; > >+ > >+ /** > >+ * Captures consolidated execution stats since last reset > >+ */ > >+ final AtomicLong consolidatedExecTime = new AtomicLong(); > >+ final AtomicInteger consolidatedExecRuns = new > >AtomicInteger(); > >+ final AtomicLong consolidatedNodes = new AtomicLong(); > >+ final String[] names = {"Executions", "Execution Time", > >"Nodes"}; > >+ CompositeType consolidatedType; > >+ > >+ ExecutionStats() { > >+ executor = > >Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { > >+ @Override > >+ public Thread newThread(@Nonnull Runnable r) { > >+ Thread thread = new Thread(r, > >"AsyncIndexStats-ExecutionsStats"); > >+ thread.setDaemon(true); > >+ return thread; > >+ } > >+ }); > >+ execCounter = new TimeSeriesRecorder(true); > >+ execTimer = new TimeSeriesRecorder(true); > >+ > >+ executor.scheduleAtFixedRate(new Runnable() { > >+ public void run() { > >+ execCounter.recordOneSecond(); > >+ execTimer.recordOneSecond(); > >+ } > >+ }, 1, 1, TimeUnit.SECONDS); > >+ > >+ try { > >+ consolidatedType = new > >CompositeType("ConsolidatedStats", > >+ "Consolidated stats", names, > >+ names, > >+ new OpenType[] {SimpleType.LONG, > >SimpleType.LONG, SimpleType.LONG}); > >+ } catch (OpenDataException e) { > >+ log.warn("Error in creating CompositeType for > >consolidated stats", e); > >+ } > >+ } > >+ > >+ void incrementCounter() { > >+ execCounter.getCounter().incrementAndGet(); > >+ consolidatedExecRuns.incrementAndGet(); > >+ } > >+ > >+ void recordExecution(long time, long updates) { > >+ execTimer.getCounter().addAndGet(time); > >+ consolidatedExecTime.addAndGet(time); > >+ consolidatedNodes.addAndGet(updates); > >+ } > >+ > >+ public CompositeData getExecutionCount() { > >+ return TimeSeriesStatsUtil.asCompositeData(execCounter, > >"ExecutionCount"); > >+ } > >+ > >+ public CompositeData getExecutionTime() { > >+ return TimeSeriesStatsUtil.asCompositeData(execTimer, > >"ExecutionTime"); > >+ } > >+ > >+ public CompositeData getConsolidatedStats() { > >+ try { > >+ Long[] values = new > >Long[]{consolidatedExecRuns.longValue(), > >+ consolidatedExecTime.longValue(), > >consolidatedNodes.longValue()}; > >+ return new CompositeDataSupport(consolidatedType, > >names, values); > >+ } catch (Exception e) { > >+ log.error("Error retrieving consolidated stats", e); > >+ return null; > >+ } > >+ } > >+ > >+ public void resetConsolidatedStats() { > >+ consolidatedExecRuns.set(0); > >+ consolidatedExecTime.set(0); > >+ consolidatedNodes.set(0); > >+ } > >+ } > > } > > > > /** > > > >Modified: > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat > >s/RepositoryStats.java > >URL: > > > http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o > >rg/apache/jackrabbit/oak/stats/RepositoryStats.java?rev=1635058&r1=1635057 > >&r2=1635058&view=diff > >========================================================================== > >==== > >--- > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat > >s/RepositoryStats.java (original) > >+++ > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat > >s/RepositoryStats.java Wed Oct 29 06:17:00 2014 > >@@ -35,13 +35,7 @@ import static org.apache.jackrabbit.api. > > import static > >org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_CO > >UNTER; > > import static > >org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_DU > >RATION; > > > >-import javax.management.openmbean.ArrayType; > > import javax.management.openmbean.CompositeData; > >-import javax.management.openmbean.CompositeDataSupport; > >-import javax.management.openmbean.CompositeType; > >-import javax.management.openmbean.OpenDataException; > >-import javax.management.openmbean.OpenType; > >-import javax.management.openmbean.SimpleType; > > > > import org.apache.jackrabbit.api.stats.RepositoryStatistics; > > import org.apache.jackrabbit.api.stats.TimeSeries; > >@@ -132,39 +126,16 @@ public class RepositoryStats implements > > > > @Override > > public CompositeData getObservationQueueMaxLength() { > >- return asCompositeData(maxQueueLength, "maximal length of > >observation queue"); > >+ return TimeSeriesStatsUtil > >+ .asCompositeData(maxQueueLength, "maximal length of > >observation queue"); > > } > > > >- public static final String[] ITEM_NAMES = new String[] { > >- "per second", "per minute", "per hour", "per week"}; > >- > > private TimeSeries getTimeSeries(Type type) { > > return repoStats.getTimeSeries(type); > > } > > > > private CompositeData asCompositeData(Type type) { > >- return asCompositeData(getTimeSeries(type), type.name()); > >- } > >- > >- private static CompositeData asCompositeData(TimeSeries timeSeries, > >String name) { > >- try { > >- long[][] values = new long[][] { > >- timeSeries.getValuePerSecond(), > >- timeSeries.getValuePerMinute(), > >- timeSeries.getValuePerHour(), > >- timeSeries.getValuePerWeek()}; > >- return new CompositeDataSupport(getCompositeType(name), > >ITEM_NAMES, values); > >- } catch (Exception e) { > >- LOG.error("Error creating CompositeData instance from > >TimeSeries", e); > >- return null; > >- } > >- } > >- > >- private static CompositeType getCompositeType(String name) throws > >OpenDataException { > >- ArrayType<int[]> longArrayType = new > >ArrayType<int[]>(SimpleType.LONG, true); > >- OpenType<?>[] itemTypes = new OpenType[] { > >- longArrayType, longArrayType, longArrayType, > >longArrayType}; > >- return new CompositeType(name, name + " time series", > >ITEM_NAMES, ITEM_NAMES, itemTypes); > >+ return TimeSeriesStatsUtil.asCompositeData(getTimeSeries(type), > >type.name()); > > } > > > > } > > > >Added: > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat > >s/TimeSeriesStatsUtil.java > >URL: > > > http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o > >rg/apache/jackrabbit/oak/stats/TimeSeriesStatsUtil.java?rev=1635058&view=a > >uto > >========================================================================== > >==== > >--- > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat > >s/TimeSeriesStatsUtil.java (added) > >+++ > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat > >s/TimeSeriesStatsUtil.java Wed Oct 29 06:17:00 2014 > >@@ -0,0 +1,58 @@ > >+/* > >+ * Licensed to the Apache Software Foundation (ASF) under one > >+ * or more contributor license agreements. See the NOTICE file > >+ * distributed with this work for additional information > >+ * regarding copyright ownership. The ASF licenses this file > >+ * to you under the Apache License, Version 2.0 (the > >+ * "License"); you may not use this file except in compliance > >+ * with the License. You may obtain a copy of the License at > >+ * > >+ * http://www.apache.org/licenses/LICENSE-2.0 > >+ * > >+ * Unless required by applicable law or agreed to in writing, > >+ * software distributed under the License is distributed on an > >+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >+ * KIND, either express or implied. See the License for the > >+ * specific language governing permissions and limitations > >+ * under the License. > >+ */ > >+package org.apache.jackrabbit.oak.stats; > >+ > >+import org.apache.jackrabbit.api.stats.TimeSeries; > >+import org.slf4j.Logger; > >+import org.slf4j.LoggerFactory; > >+ > >+import javax.management.openmbean.ArrayType; > >+import javax.management.openmbean.CompositeData; > >+import javax.management.openmbean.CompositeDataSupport; > >+import javax.management.openmbean.CompositeType; > >+import javax.management.openmbean.OpenDataException; > >+import javax.management.openmbean.OpenType; > >+import javax.management.openmbean.SimpleType; > >+ > >+/** > >+ * Utility class for retrieving {@link > >javax.management.openmbean.CompositeData} for > >+ * {@link org.apache.jackrabbit.api.stats.TimeSeries}. > >+ */ > >+public class TimeSeriesStatsUtil { > >+ public static final String[] ITEM_NAMES = new String[] {"per > >second", "per minute", "per hour", "per week"}; > >+ > >+ private static final Logger LOG = > >LoggerFactory.getLogger(TimeSeriesStatsUtil.class); > >+ > >+ public static CompositeData asCompositeData(TimeSeries timeSeries, > >String name) { > >+ try { > >+ long[][] values = new long[][] > >{timeSeries.getValuePerSecond(), timeSeries.getValuePerMinute(), > >+ timeSeries.getValuePerHour(), > >timeSeries.getValuePerWeek()}; > >+ return new CompositeDataSupport(getCompositeType(name), > >ITEM_NAMES, values); > >+ } catch (Exception e) { > >+ LOG.error("Error creating CompositeData instance from > >TimeSeries", e); > >+ return null; > >+ } > >+ } > >+ > >+ static CompositeType getCompositeType(String name) throws > >OpenDataException { > >+ ArrayType<int[]> longArrayType = new > >ArrayType<int[]>(SimpleType.LONG, true); > >+ OpenType<?>[] itemTypes = new OpenType[] {longArrayType, > >longArrayType, longArrayType, longArrayType}; > >+ return new CompositeType(name, name + " time series", > >ITEM_NAMES, ITEM_NAMES, itemTypes); > >+ } > >+} > > > >Propchange: > >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat > >s/TimeSeriesStatsUtil.java > >-------------------------------------------------------------------------- > >---- > > svn:eol-style = native > > > > > >