Sorry guys,

I've found the error in the code itself and not in the handling of sample 
senders. 

Hint: Never use static ThreadLocal (statisticalMapSelector) AND non-static 
attributes (statisticalMaps) to store data for threads.
(removing the "static" from the ThreadLocal solved all the problems)

One simple question is still open, when and why does JMeter opens different 
instances of SampleSender?
Tracking the data that had been processed at the instances gave me no clue 
about a logic behind it.

G Danny

-----original message-----
Von: Danny Lade [mailto:[email protected]] 
Gesendet: Dienstag, 19. März 2013 10:45
An: [email protected]
Betreff: curiosities in SampleSender

Dear JMeter-Community,

there are two things I don't get using my own SampleSender (see attachement).

1.  if I initialize the "statisticalMapSelector" (ThreadLocal) static I get no 
results at the client shown and the jmeter-server.log looks like :

    2013/03/19 09:45:28 INFO  - 
net.bigpoint.jmeter.samplers.StatisticalHoldSampleSender: Test Ended on host: 
10.188.26.243 
    2013/03/19 09:45:28 INFO  - 
net.bigpoint.jmeter.samplers.StatisticalHoldSampleSender: Process 110 events 
    2013/03/19 09:45:28 INFO  - 
net.bigpoint.jmeter.samplers.StatisticalHoldSampleSender: Test Ended on host: 
10.188.26.243 
    2013/03/19 09:45:28 INFO  - 
net.bigpoint.jmeter.samplers.StatisticalHoldSampleSender: Process 0 events 
    2013/03/19 09:45:28 INFO  - jmeter.engine.StandardJMeterEngine: Test has 
ended on host 10.188.26.243

2. if I clean up the data at testEnded() using "finally { 
statisticalMaps.clear(); }" I get no results at the client shown and the 
jmeter-server.log looks like above

In all other cases this code works fine (as attached)
-----source-----
package net.bigpoint.jmeter.samplers;

import static com.google.common.base.Objects.firstNonNull;
import static com.google.common.base.Objects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.emptyToNull;

import java.io.ObjectStreamException;
import java.io.Serializable;
import java.rmi.RemoteException;
import java.util.List;
import java.util.Map;

import org.apache.jmeter.samplers.AbstractSampleSender;
import org.apache.jmeter.samplers.RemoteSampleListener;
import org.apache.jmeter.samplers.SampleEvent;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.save.SaveService;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;

import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

public class StatisticalHoldSampleSender extends AbstractSampleSender 
implements Serializable {

    private static final long serialVersionUID = 1167910624585715712L;

    private static final Logger log = LoggingManager.getLoggerForClass();

    private final RemoteSampleListener listener;

    // note: it is necessary to initialize this value at readResolve() and NOT 
statically
    private static transient ThreadLocal<Map<StatisticalResult, 
StatisticalResult>> statisticalMapSelector;

    private transient List<Map<StatisticalResult, StatisticalResult>> 
statisticalMaps;

    /**
     * Constructor, only called by client code.
     * 
     * @param listener that the List of sample events will be sent to.
     */
    public StatisticalHoldSampleSender(RemoteSampleListener listener) {
        this.listener = listener;
    }

    /**
     * @param event a Sample Event
     */
    @Override
    public void sampleOccurred(SampleEvent event) {
        StatisticalResult key = new StatisticalResult(event);
        StatisticalResult result = putIfAbsent(statisticalMapSelector.get(), 
key, key);
        result.update(event);
    }

    /**
     * Checks if any sample events are still present in the sampleStore and
     * sends them to the listener. Informs the listener of the testended.
     * 
     * @param host the hostname that the test has ended on.
     */
    @Override
    public void testEnded(String host) {
        final String threadName = Thread.currentThread().getName();
        log.info("Test Ended on host: " + host);
        try {
            List<SampleEvent> events = Lists.newArrayList();
            for (Map<StatisticalResult, StatisticalResult> statisticalResultMap 
: statisticalMaps) {
                for (StatisticalResult statisticalResult : 
statisticalResultMap.values()) {
                    if (log.isDebugEnabled()) {
                        log.debug("# " + threadName + " - Add event for: " + 
statisticalResult);
                    }
                    events.add(createSampleEvent(statisticalResult));
                }
                statisticalResultMap.clear();
            }
            log.info("Process " + events.size() + " events");
            listener.processBatch(events);
            listener.testEnded(host);
        } catch (RemoteException err) {
            log.error("testEnded(hostname)", err);
        }
    }

    private static <K, V> V putIfAbsent(Map<K, V> map, K key, V value) {
        V originalValue = map.get(key);
        if (originalValue == null) {
            map.put(key, value);
            originalValue = value;
            if (log.isDebugEnabled()) {
                log.debug("# Add object to map: " + value);
            }
        }
        return originalValue;
    }

    static SampleEvent createSampleEvent(StatisticalResult statResult) {
        SampleResult sampleResult = new SampleResult(statResult.startTime, 
statResult.elapsed);

        sampleResult.setThreadName(statResult.threadName);
        sampleResult.setSampleLabel(statResult.label);
        sampleResult.setSampleCount(statResult.sampleCount);
        sampleResult.setDataType(statResult.sampleType);
        sampleResult.setBytes(statResult.requestSize);
        sampleResult.setErrorCount(statResult.errorCount);
        sampleResult.setResponseCode(statResult.responseCode);
        sampleResult.setLatency(statResult.latency);
        sampleResult.setEndTime(statResult.endTime);

        return new SampleEvent(sampleResult, statResult.threadGroup);
    }

    /**
     * Processed by the RMI server code; acts as testStarted().
     * 
     * @throws ObjectStreamException
     */
    @SuppressWarnings("static-access")
    private Object readResolve() throws ObjectStreamException {
        this.statisticalMaps = Lists.newArrayList();
        this.statisticalMapSelector = new ThreadLocal<Map<StatisticalResult, 
StatisticalResult>>() {
            @Override
            protected java.util.Map<StatisticalResult, StatisticalResult> 
initialValue() {
                final Map<StatisticalResult, StatisticalResult> statisticalMap 
= Maps.newHashMap();

                statisticalMaps.add(statisticalMap);
                return statisticalMap;
            };
        };

        log.info(toStringHelper("running with ").toString());
        return this;
    }

    static class StatisticalResult {

        final String threadGroup;

        final String threadName;

        final String label;

        final String responseCode;

        final String sampleType;

        int sampleCount = 0;

        int errorCount = 0;

        int requestSize = 0;

        long startTime = Long.MAX_VALUE; // guarantee it is set with the first 
update()

        long endTime = 0;

        long latency = 0;

        long elapsed = 0;

        /**
         * Create a statistical sample result.
         */
        StatisticalResult(SampleEvent event) {
            // using of intern string pool guarantees precalculated hashes and 
less memory usage
            checkNotNull(event);
            SampleResult result = checkNotNull(event.getResult());
            this.threadGroup = 
firstNonNull(emptyToNull(event.getThreadGroup()), "<unknown group>").intern();
            this.threadName = firstNonNull(emptyToNull(result.getThreadName()), 
"<unknown thread>").intern();
            this.label = firstNonNull(emptyToNull(result.getSampleLabel()), 
"<unknown label>").intern();
            this.responseCode = 
firstNonNull(emptyToNull(result.getResponseCode()), "<unknown response 
code>").intern();
            this.sampleType = 
SaveService.classToAlias(result.getClass().getName());
        }

        void update(SampleEvent event) {
            checkNotNull(event);
            SampleResult result = checkNotNull(event.getResult());
            if (!result.isSuccessful()) {
                errorCount++;
            }
            sampleCount += result.getSampleCount();
            requestSize += result.getBytes();
            startTime = Math.min(startTime, result.getStartTime());
            endTime = Math.max(endTime, result.getEndTime());
            latency += result.getLatency();
            elapsed += result.getTime();
        }

        @Override
        public int hashCode() {
            return Objects.hashCode(responseCode, label, sampleType);
        }

        @Override
        public boolean equals(Object obj) {
            if (obj == null || this.getClass() != obj.getClass()) {
                return false;
            }
            final StatisticalResult other = (StatisticalResult) obj;
            return Objects.equal(this.responseCode, other.responseCode) && 
Objects.equal(this.label, other.label)
                    && Objects.equal(this.sampleType, other.sampleType);
        }

        @Override
        public String toString() {
            return toStringHelper(this).add("sampleType", 
sampleType).add("threadGroup", threadGroup).add("threadName", 
threadName).add("label", label)
                    .add("responseCode", responseCode).add("sampleCount", 
sampleCount).add("errorCount", errorCount).add("requestSize", requestSize)
                    .add("startTime", startTime).add("endTime", 
endTime).add("latency", latency).add("elapsedTime", elapsed).toString();
        }
    }
}

Reply via email to