Hi,

>>Would it be an option to use the ScheduledExecutorService from
org.apache.jackrabbit.oak.Oak?

In revision 1635436, I removed the custom executor service and used
whiteboard instance to schedule in org.apache.jackrabbit.oak.Oak.

Thanks
Amit

On Wed, Oct 29, 2014 at 10:10 PM, Marcel Reutegger <mreut...@adobe.com>
wrote:

> Hi,
>
> I think this commit causes OOME when the build is run with the node store
> fixtures for MongoDB and RDB.
>
> The heap dump shows many DocumentNodeStore instances, which cannot be
> garbage collected because the ScheduledExecutorService instance introduce
> with this change is still running and references the store.
>
> Would it be an option to use the ScheduledExecutorService from
> org.apache.jackrabbit.oak.Oak?
>
> Regards
>  Marcel
>
> On 29/10/14 07:17, "am...@apache.org" <am...@apache.org> wrote:
>
> >Author: amitj
> >Date: Wed Oct 29 06:17:00 2014
> >New Revision: 1635058
> >
> >URL: http://svn.apache.org/r1635058
> >Log:
> >OAK-2230: Execution Stats for async indexing
> >Added resettable consolidated execution stats
> >TimeSeries for trends
> >
> >Added:
> >
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/TimeSeriesStatsUtil.java   (with props)
> >Modified:
> >
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
> >jmx/IndexStatsMBean.java
> >
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
> >ins/index/AsyncIndexUpdate.java
> >
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/RepositoryStats.java
> >
> >Modified:
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
> >jmx/IndexStatsMBean.java
> >URL:
> >
> http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
> >rg/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1635058&r1=16350
> >57&r2=1635058&view=diff
> >==========================================================================
> >====
> >---
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
> >jmx/IndexStatsMBean.java (original)
> >+++
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
> >jmx/IndexStatsMBean.java Wed Oct 29 06:17:00 2014
> >@@ -17,6 +17,8 @@
> >
> > package org.apache.jackrabbit.oak.api.jmx;
> >
> >+import javax.management.openmbean.CompositeData;
> >+
> > public interface IndexStatsMBean {
> >
> >     String TYPE = "IndexStats";
> >@@ -114,4 +116,28 @@ public interface IndexStatsMBean {
> >      */
> >     String getTemporaryCheckpoints();
> >
> >+    /**
> >+     * Returns the number of executions as a {@link
> >org.apache.jackrabbit.api.stats.TimeSeries}.
> >+     *
> >+     * @return the execution count time series
> >+     */
> >+    CompositeData getExecutionCount();
> >+
> >+    /**
> >+     * Returns the execution time as a {@link
> >org.apache.jackrabbit.api.stats.TimeSeries}.
> >+     *
> >+     * @return the execution times time series
> >+     */
> >+    CompositeData getExecutionTime();
> >+
> >+    /**
> >+     * Returns the consolidated execution stats since last reset
> >+     * @return consolidated execution stats
> >+     */
> >+    CompositeData getConsolidatedExecutionStats();
> >+
> >+    /**
> >+     * Resets the consolidated stats.
> >+     */
> >+    void resetConsolidatedExecutionStats();
> > }
> >
> >Modified:
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
> >ins/index/AsyncIndexUpdate.java
> >URL:
> >
> http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
> >rg/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1635058&r
> >1=1635057&r2=1635058&view=diff
> >==========================================================================
> >====
> >---
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
> >ins/index/AsyncIndexUpdate.java (original)
> >+++
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
> >ins/index/AsyncIndexUpdate.java Wed Oct 29 06:17:00 2014
> >@@ -28,10 +28,22 @@ import static org.apache.jackrabbit.oak.
> > import java.util.Calendar;
> > import java.util.HashSet;
> > import java.util.Set;
> >+import java.util.concurrent.Executors;
> >+import java.util.concurrent.ScheduledExecutorService;
> >+import java.util.concurrent.ThreadFactory;
> > import java.util.concurrent.TimeUnit;
> >+import java.util.concurrent.atomic.AtomicInteger;
> >+import java.util.concurrent.atomic.AtomicLong;
> >
> > import javax.annotation.Nonnull;
> >+import javax.management.openmbean.CompositeData;
> >+import javax.management.openmbean.CompositeDataSupport;
> >+import javax.management.openmbean.CompositeType;
> >+import javax.management.openmbean.OpenDataException;
> >+import javax.management.openmbean.OpenType;
> >+import javax.management.openmbean.SimpleType;
> >
> >+import com.google.common.base.Stopwatch;
> > import org.apache.jackrabbit.oak.api.CommitFailedException;
> > import org.apache.jackrabbit.oak.api.PropertyState;
> > import org.apache.jackrabbit.oak.api.Type;
> >@@ -51,6 +63,8 @@ import org.apache.jackrabbit.oak.spi.sta
> > import org.apache.jackrabbit.oak.spi.state.NodeState;
> > import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
> > import org.apache.jackrabbit.oak.spi.state.NodeStore;
> >+import org.apache.jackrabbit.oak.stats.TimeSeriesStatsUtil;
> >+import org.apache.jackrabbit.stats.TimeSeriesRecorder;
> > import org.apache.jackrabbit.util.ISO8601;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >@@ -380,6 +394,7 @@ public class AsyncIndexUpdate implements
> >                     reindexedDefinitions.clear();
> >                 }
> >                 postAsyncRunStatsStatus(indexStats);
> >+                indexStats.resetConsolidatedExecutionStats();
> >             }
> >             mergeWithConcurrencyCheck(builder, beforeCheckpoint,
> >callback.lease);
> >         } finally {
> >@@ -443,17 +458,29 @@ public class AsyncIndexUpdate implements
> >
> >         private volatile boolean isPaused;
> >         private volatile long updates;
> >+        private Stopwatch watch = Stopwatch.createUnstarted();
> >+        private ExecutionStats execStats = new ExecutionStats();
> >
> >         public void start(String now) {
> >             status = STATUS_RUNNING;
> >             start = now;
> >             done = "";
> >+
> >+            if (watch.isRunning()) {
> >+                watch.reset();
> >+            }
> >+            watch.start();
> >         }
> >
> >         public void done(String now) {
> >             status = STATUS_DONE;
> >-            start = "";
> >             done = now;
> >+            if (watch.isRunning()) {
> >+                watch.stop();
> >+            }
> >+            execStats.incrementCounter();
> >+
> >execStats.recordExecution(watch.elapsed(TimeUnit.MILLISECONDS), updates);
> >+            watch.reset();
> >         }
> >
> >         @Override
> >@@ -534,6 +561,26 @@ public class AsyncIndexUpdate implements
> >         }
> >
> >         @Override
> >+        public CompositeData getExecutionCount() {
> >+            return execStats.getExecutionCount();
> >+        }
> >+
> >+        @Override
> >+        public CompositeData getExecutionTime() {
> >+            return execStats.getExecutionTime();
> >+        }
> >+
> >+        @Override
> >+        public CompositeData getConsolidatedExecutionStats() {
> >+            return execStats.getConsolidatedStats();
> >+        }
> >+
> >+        @Override
> >+        public void resetConsolidatedExecutionStats() {
> >+            execStats.resetConsolidatedStats();
> >+        }
> >+
> >+        @Override
> >         public String toString() {
> >             return "AsyncIndexStats [start=" + start + ", done=" + done
> >                     + ", status=" + status + ", paused=" + isPaused
> >@@ -541,6 +588,86 @@ public class AsyncIndexUpdate implements
> >                     + referenceCp + ", processedCheckpoint=" +
> >processedCp
> >                     + " ,tempCheckpoints=" + tempCps + " ]";
> >         }
> >+
> >+        class ExecutionStats {
> >+            final ScheduledExecutorService executor;
> >+            final TimeSeriesRecorder execCounter;
> >+            final TimeSeriesRecorder execTimer;
> >+
> >+            /**
> >+             * Captures consolidated execution stats since last reset
> >+             */
> >+            final AtomicLong consolidatedExecTime = new AtomicLong();
> >+            final AtomicInteger consolidatedExecRuns = new
> >AtomicInteger();
> >+            final AtomicLong consolidatedNodes = new AtomicLong();
> >+            final String[] names = {"Executions", "Execution Time",
> >"Nodes"};
> >+            CompositeType consolidatedType;
> >+
> >+            ExecutionStats() {
> >+                executor =
> >Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
> >+                    @Override
> >+                    public Thread newThread(@Nonnull Runnable r) {
> >+                        Thread thread = new Thread(r,
> >"AsyncIndexStats-ExecutionsStats");
> >+                        thread.setDaemon(true);
> >+                        return thread;
> >+                    }
> >+                });
> >+                execCounter = new TimeSeriesRecorder(true);
> >+                execTimer = new TimeSeriesRecorder(true);
> >+
> >+                executor.scheduleAtFixedRate(new Runnable() {
> >+                    public void run() {
> >+                        execCounter.recordOneSecond();
> >+                        execTimer.recordOneSecond();
> >+                    }
> >+                }, 1, 1, TimeUnit.SECONDS);
> >+
> >+                try {
> >+                    consolidatedType = new
> >CompositeType("ConsolidatedStats",
> >+                        "Consolidated stats", names,
> >+                        names,
> >+                        new OpenType[] {SimpleType.LONG,
> >SimpleType.LONG, SimpleType.LONG});
> >+                } catch (OpenDataException e) {
> >+                    log.warn("Error in creating CompositeType for
> >consolidated stats", e);
> >+                }
> >+            }
> >+
> >+            void incrementCounter() {
> >+                execCounter.getCounter().incrementAndGet();
> >+                consolidatedExecRuns.incrementAndGet();
> >+            }
> >+
> >+            void recordExecution(long time, long updates) {
> >+                execTimer.getCounter().addAndGet(time);
> >+                consolidatedExecTime.addAndGet(time);
> >+                consolidatedNodes.addAndGet(updates);
> >+            }
> >+
> >+            public CompositeData getExecutionCount() {
> >+                return TimeSeriesStatsUtil.asCompositeData(execCounter,
> >"ExecutionCount");
> >+            }
> >+
> >+            public CompositeData getExecutionTime() {
> >+                return TimeSeriesStatsUtil.asCompositeData(execTimer,
> >"ExecutionTime");
> >+            }
> >+
> >+            public CompositeData getConsolidatedStats() {
> >+                try {
> >+                    Long[] values = new
> >Long[]{consolidatedExecRuns.longValue(),
> >+                        consolidatedExecTime.longValue(),
> >consolidatedNodes.longValue()};
> >+                    return new CompositeDataSupport(consolidatedType,
> >names, values);
> >+                } catch (Exception e) {
> >+                    log.error("Error retrieving consolidated stats", e);
> >+                    return null;
> >+                }
> >+            }
> >+
> >+            public void resetConsolidatedStats() {
> >+                consolidatedExecRuns.set(0);
> >+                consolidatedExecTime.set(0);
> >+                consolidatedNodes.set(0);
> >+            }
> >+        }
> >     }
> >
> >     /**
> >
> >Modified:
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/RepositoryStats.java
> >URL:
> >
> http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
> >rg/apache/jackrabbit/oak/stats/RepositoryStats.java?rev=1635058&r1=1635057
> >&r2=1635058&view=diff
> >==========================================================================
> >====
> >---
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/RepositoryStats.java (original)
> >+++
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/RepositoryStats.java Wed Oct 29 06:17:00 2014
> >@@ -35,13 +35,7 @@ import static org.apache.jackrabbit.api.
> > import static
> >org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_CO
> >UNTER;
> > import static
> >org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_DU
> >RATION;
> >
> >-import javax.management.openmbean.ArrayType;
> > import javax.management.openmbean.CompositeData;
> >-import javax.management.openmbean.CompositeDataSupport;
> >-import javax.management.openmbean.CompositeType;
> >-import javax.management.openmbean.OpenDataException;
> >-import javax.management.openmbean.OpenType;
> >-import javax.management.openmbean.SimpleType;
> >
> > import org.apache.jackrabbit.api.stats.RepositoryStatistics;
> > import org.apache.jackrabbit.api.stats.TimeSeries;
> >@@ -132,39 +126,16 @@ public class RepositoryStats implements
> >
> >     @Override
> >     public CompositeData getObservationQueueMaxLength() {
> >-        return asCompositeData(maxQueueLength, "maximal length of
> >observation queue");
> >+        return TimeSeriesStatsUtil
> >+            .asCompositeData(maxQueueLength, "maximal length of
> >observation queue");
> >     }
> >
> >-    public static final String[] ITEM_NAMES = new String[] {
> >-            "per second", "per minute", "per hour", "per week"};
> >-
> >     private TimeSeries getTimeSeries(Type type) {
> >         return repoStats.getTimeSeries(type);
> >     }
> >
> >     private CompositeData asCompositeData(Type type) {
> >-        return asCompositeData(getTimeSeries(type), type.name());
> >-    }
> >-
> >-    private static CompositeData asCompositeData(TimeSeries timeSeries,
> >String name) {
> >-        try {
> >-            long[][] values = new long[][] {
> >-                timeSeries.getValuePerSecond(),
> >-                timeSeries.getValuePerMinute(),
> >-                timeSeries.getValuePerHour(),
> >-                timeSeries.getValuePerWeek()};
> >-            return new CompositeDataSupport(getCompositeType(name),
> >ITEM_NAMES, values);
> >-        } catch (Exception e) {
> >-            LOG.error("Error creating CompositeData instance from
> >TimeSeries", e);
> >-            return null;
> >-        }
> >-    }
> >-
> >-    private static CompositeType getCompositeType(String name) throws
> >OpenDataException {
> >-        ArrayType<int[]> longArrayType = new
> >ArrayType<int[]>(SimpleType.LONG, true);
> >-        OpenType<?>[] itemTypes = new OpenType[] {
> >-                longArrayType, longArrayType, longArrayType,
> >longArrayType};
> >-        return new CompositeType(name, name + " time series",
> >ITEM_NAMES, ITEM_NAMES, itemTypes);
> >+        return TimeSeriesStatsUtil.asCompositeData(getTimeSeries(type),
> >type.name());
> >     }
> >
> > }
> >
> >Added:
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/TimeSeriesStatsUtil.java
> >URL:
> >
> http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
> >rg/apache/jackrabbit/oak/stats/TimeSeriesStatsUtil.java?rev=1635058&view=a
> >uto
> >==========================================================================
> >====
> >---
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/TimeSeriesStatsUtil.java (added)
> >+++
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/TimeSeriesStatsUtil.java Wed Oct 29 06:17:00 2014
> >@@ -0,0 +1,58 @@
> >+/*
> >+ * Licensed to the Apache Software Foundation (ASF) under one
> >+ * or more contributor license agreements.  See the NOTICE file
> >+ * distributed with this work for additional information
> >+ * regarding copyright ownership.  The ASF licenses this file
> >+ * to you under the Apache License, Version 2.0 (the
> >+ * "License"); you may not use this file except in compliance
> >+ * with the License.  You may obtain a copy of the License at
> >+ *
> >+ *   http://www.apache.org/licenses/LICENSE-2.0
> >+ *
> >+ * Unless required by applicable law or agreed to in writing,
> >+ * software distributed under the License is distributed on an
> >+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >+ * KIND, either express or implied.  See the License for the
> >+ * specific language governing permissions and limitations
> >+ * under the License.
> >+ */
> >+package org.apache.jackrabbit.oak.stats;
> >+
> >+import org.apache.jackrabbit.api.stats.TimeSeries;
> >+import org.slf4j.Logger;
> >+import org.slf4j.LoggerFactory;
> >+
> >+import javax.management.openmbean.ArrayType;
> >+import javax.management.openmbean.CompositeData;
> >+import javax.management.openmbean.CompositeDataSupport;
> >+import javax.management.openmbean.CompositeType;
> >+import javax.management.openmbean.OpenDataException;
> >+import javax.management.openmbean.OpenType;
> >+import javax.management.openmbean.SimpleType;
> >+
> >+/**
> >+ * Utility class for retrieving {@link
> >javax.management.openmbean.CompositeData} for
> >+ * {@link org.apache.jackrabbit.api.stats.TimeSeries}.
> >+ */
> >+public class TimeSeriesStatsUtil {
> >+    public static final String[] ITEM_NAMES = new String[] {"per
> >second", "per minute", "per hour", "per week"};
> >+
> >+    private static final Logger LOG =
> >LoggerFactory.getLogger(TimeSeriesStatsUtil.class);
> >+
> >+    public static CompositeData asCompositeData(TimeSeries timeSeries,
> >String name) {
> >+        try {
> >+            long[][] values = new long[][]
> >{timeSeries.getValuePerSecond(), timeSeries.getValuePerMinute(),
> >+                timeSeries.getValuePerHour(),
> >timeSeries.getValuePerWeek()};
> >+            return new CompositeDataSupport(getCompositeType(name),
> >ITEM_NAMES, values);
> >+        } catch (Exception e) {
> >+            LOG.error("Error creating CompositeData instance from
> >TimeSeries", e);
> >+            return null;
> >+        }
> >+    }
> >+
> >+    static CompositeType getCompositeType(String name) throws
> >OpenDataException {
> >+        ArrayType<int[]> longArrayType = new
> >ArrayType<int[]>(SimpleType.LONG, true);
> >+        OpenType<?>[] itemTypes = new OpenType[] {longArrayType,
> >longArrayType, longArrayType, longArrayType};
> >+        return new CompositeType(name, name + " time series",
> >ITEM_NAMES, ITEM_NAMES, itemTypes);
> >+    }
> >+}
> >
> >Propchange:
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/TimeSeriesStatsUtil.java
> >--------------------------------------------------------------------------
> >----
> >    svn:eol-style = native
> >
> >
>
>

Reply via email to