http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --cc 
metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java
index fc94afa,0000000..3f16edd
mode 100644,000000..100644
--- 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java
@@@ -1,356 -1,0 +1,378 @@@
 +/*
 + *
 + *  Licensed to the Apache Software Foundation (ASF) under one
 + *  or more contributor license agreements.  See the NOTICE file
 + *  distributed with this work for additional information
 + *  regarding copyright ownership.  The ASF licenses this file
 + *  to you under the Apache License, Version 2.0 (the
 + *  "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + *
 + */
 +
 +package org.apache.metron.profiler.storm;
 +
 +import org.apache.metron.common.configuration.profiler.ProfileConfig;
 +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
 +import org.apache.metron.profiler.MessageDistributor;
 +import org.apache.metron.profiler.MessageRoute;
 +import org.apache.metron.profiler.ProfileMeasurement;
 +import org.apache.metron.profiler.storm.integration.MessageBuilder;
 +import org.apache.metron.test.bolt.BaseBoltTest;
 +import org.apache.storm.task.OutputCollector;
 +import org.apache.storm.topology.OutputFieldsDeclarer;
 +import org.apache.storm.topology.base.BaseWindowedBolt;
 +import org.apache.storm.tuple.Fields;
 +import org.apache.storm.tuple.Tuple;
 +import org.apache.storm.tuple.Values;
 +import org.apache.storm.windowing.TupleWindow;
 +import org.json.simple.JSONObject;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.mockito.ArgumentCaptor;
 +
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.mockito.Matchers.any;
 +import static org.mockito.Matchers.eq;
++import static org.mockito.Mockito.doThrow;
 +import static org.mockito.Mockito.mock;
 +import static org.mockito.Mockito.times;
 +import static org.mockito.Mockito.verify;
 +import static org.mockito.Mockito.when;
 +
 +/**
 + * Tests the ProfileBuilderBolt.
 + */
 +public class ProfileBuilderBoltTest extends BaseBoltTest {
 +
 +  private JSONObject message1;
 +  private JSONObject message2;
 +  private ProfileConfig profile1;
 +  private ProfileConfig profile2;
 +  private ProfileMeasurementEmitter emitter;
 +  private ManualFlushSignal flushSignal;
 +  private ProfileMeasurement measurement;
 +
 +  @Before
 +  public void setup() throws Exception {
 +
 +    message1 = new MessageBuilder()
 +            .withField("ip_src_addr", "10.0.0.1")
 +            .withField("value", "22")
 +            .build();
 +
 +    message2 = new MessageBuilder()
 +            .withField("ip_src_addr", "10.0.0.2")
 +            .withField("value", "22")
 +            .build();
 +
 +    profile1 = new ProfileConfig()
 +            .withProfile("profile1")
 +            .withForeach("ip_src_addr")
 +            .withInit("x", "0")
 +            .withUpdate("x", "x + 1")
 +            .withResult("x");
 +
 +    profile2 = new ProfileConfig()
 +            .withProfile("profile2")
 +            .withForeach("ip_src_addr")
 +            .withInit(Collections.singletonMap("x", "0"))
 +            .withUpdate(Collections.singletonMap("x", "x + 1"))
 +            .withResult("x");
 +
 +    measurement = new ProfileMeasurement()
 +            .withEntity("entity1")
 +            .withProfileName("profile1")
 +            .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
 +            .withProfileValue(22);
 +
 +    flushSignal = new ManualFlushSignal();
 +    flushSignal.setFlushNow(false);
 +  }
 +
 +  /**
 +   * The bolt should extract a message and timestamp from a tuple and
 +   * pass that to a {@code MessageDistributor}.
 +   */
 +  @Test
 +  public void testExtractMessage() throws Exception {
 +
 +    ProfileBuilderBolt bolt = createBolt();
 +
 +    // create a mock
 +    MessageDistributor distributor = mock(MessageDistributor.class);
 +    bolt.withMessageDistributor(distributor);
 +
 +    // create a tuple
 +    final long timestamp1 = 100000000L;
 +    Tuple tuple1 = createTuple("entity1", message1, profile1, timestamp1);
 +
 +    // execute the bolt
 +    TupleWindow tupleWindow = createWindow(tuple1);
 +    bolt.execute(tupleWindow);
 +
 +    // the message should have been extracted from the tuple and passed to 
the MessageDistributor
 +    verify(distributor).distribute(any(MessageRoute.class), any());
 +  }
 +
 +
 +  /**
 +   * If the {@code FlushSignal} tells the bolt to flush, it should flush the 
{@code MessageDistributor}
 +   * and emit the {@code ProfileMeasurement} values from all active profiles.
 +   */
 +  @Test
 +  public void testFlushActiveProfiles() throws Exception {
 +
 +    ProfileBuilderBolt bolt = createBolt();
 +
 +    // create a mock that returns the profile measurement above
 +    MessageDistributor distributor = mock(MessageDistributor.class);
 +    
when(distributor.flush()).thenReturn(Collections.singletonList(measurement));
 +    bolt.withMessageDistributor(distributor);
 +
 +    // signal the bolt to flush
 +    flushSignal.setFlushNow(true);
 +
 +    // execute the bolt
 +    Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
 +    TupleWindow tupleWindow = createWindow(tuple1);
 +    bolt.execute(tupleWindow);
 +
 +    // a profile measurement should be emitted by the bolt
 +    List<ProfileMeasurement> measurements = 
getProfileMeasurements(outputCollector, 1);
 +    assertEquals(1, measurements.size());
 +    assertEquals(measurement, measurements.get(0));
 +  }
 +
 +  /**
 +   * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should 
be emitted.
 +   */
 +  @Test
 +  public void testDoNotFlushActiveProfiles() throws Exception {
 +
 +    ProfileBuilderBolt bolt = createBolt();
 +
 +    // create a mock where flush() returns the profile measurement above
 +    MessageDistributor distributor = mock(MessageDistributor.class);
 +    
when(distributor.flush()).thenReturn(Collections.singletonList(measurement));
 +    bolt.withMessageDistributor(distributor);
 +
 +    // there is no flush signal
 +    flushSignal.setFlushNow(false);
 +
 +    // execute the bolt
 +    Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
 +    TupleWindow tupleWindow = createWindow(tuple1);
 +    bolt.execute(tupleWindow);
 +
 +    // nothing should have been emitted
 +    getProfileMeasurements(outputCollector, 0);
 +  }
 +
 +  /**
 +   * Expired profiles should be flushed regularly, even if no input telemetry
 +   * has been received.
 +   */
 +  @Test
 +  public void testFlushExpiredProfiles() throws Exception {
 +
 +    ProfileBuilderBolt bolt = createBolt();
 +
 +    // create a mock where flushExpired() returns the profile measurement 
above
 +    MessageDistributor distributor = mock(MessageDistributor.class);
 +    
when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement));
 +    bolt.withMessageDistributor(distributor);
 +
 +    // execute test by flushing expired profiles. this is normally triggered 
by a timer task.
 +    bolt.flushExpired();
 +
 +    // a profile measurement should be emitted by the bolt
 +    List<ProfileMeasurement> measurements = 
getProfileMeasurements(outputCollector, 1);
 +    assertEquals(1, measurements.size());
 +    assertEquals(measurement, measurements.get(0));
 +  }
 +
 +  /**
 +   * A {@link ProfileMeasurement} is built for each profile/entity pair.  The 
measurement should be emitted to each
 +   * destination defined by the profile. By default, a profile uses both 
Kafka and HBase as destinations.
 +   */
 +  @Test
 +  public void testEmitters() throws Exception {
 +
 +    // defines the zk configurations accessible from the bolt
 +    ProfilerConfigurations configurations = new ProfilerConfigurations();
 +    configurations.updateGlobalConfig(Collections.emptyMap());
 +
 +    // create the bolt with 3 destinations
 +    ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
 +            .withProfileTimeToLive(30, TimeUnit.MINUTES)
 +            .withPeriodDuration(10, TimeUnit.MINUTES)
 +            .withMaxNumberOfRoutes(Long.MAX_VALUE)
 +            .withZookeeperClient(client)
 +            .withZookeeperCache(cache)
 +            .withEmitter(new TestEmitter("destination1"))
 +            .withEmitter(new TestEmitter("destination2"))
 +            .withEmitter(new TestEmitter("destination3"))
 +            .withProfilerConfigurations(configurations)
 +            .withTumblingWindow(new BaseWindowedBolt.Duration(10, 
TimeUnit.MINUTES));
 +    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
 +
 +    // signal the bolt to flush
 +    bolt.withFlushSignal(flushSignal);
 +    flushSignal.setFlushNow(true);
 +
 +    // execute the bolt
 +    Tuple tuple1 = createTuple("entity", message1, profile1, 
System.currentTimeMillis());
 +    TupleWindow window = createWindow(tuple1);
 +    bolt.execute(window);
 +
 +    // validate measurements emitted to each
 +    verify(outputCollector, times(1)).emit(eq("destination1"), any());
 +    verify(outputCollector, times(1)).emit(eq("destination2"), any());
 +    verify(outputCollector, times(1)).emit(eq("destination3"), any());
 +  }
 +
++  @Test
++  public void testExceptionWhenFlushingExpiredProfiles() throws Exception {
++    // create an emitter that will throw an exception when emit() called
++    ProfileMeasurementEmitter badEmitter = 
mock(ProfileMeasurementEmitter.class);
++    doThrow(new RuntimeException("flushExpired() should catch this 
exception"))
++            .when(badEmitter)
++            .emit(any(), any());
++
++    // create a distributor that will return a profile measurement
++    MessageDistributor distributor = mock(MessageDistributor.class);
++    
when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement));
++
++    // the bolt will use the bad emitter when flushExpired() is called
++    ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
++            .withEmitter(badEmitter)
++            .withMessageDistributor(distributor);
++
++    // the exception thrown by the emitter should not bubble up
++    bolt.flushExpired();
++  }
++
 +  /**
 +   * Retrieves the ProfileMeasurement(s) (if any) that have been emitted.
 +   *
 +   * @param collector The Storm output collector.
 +   * @param expected The number of measurements expected.
 +   * @return A list of ProfileMeasurement(s).
 +   */
 +  private List<ProfileMeasurement> getProfileMeasurements(OutputCollector 
collector, int expected) {
 +
 +    // the 'streamId' is defined by the DestinationHandler being used by the 
bolt
 +    final String streamId = emitter.getStreamId();
 +
 +    // capture the emitted tuple(s)
 +    ArgumentCaptor<Values> argCaptor = ArgumentCaptor.forClass(Values.class);
 +    verify(collector, times(expected))
 +            .emit(eq(streamId), argCaptor.capture());
 +
 +    // return the profile measurements that were emitted
 +    return argCaptor.getAllValues()
 +            .stream()
 +            .map(val -> (ProfileMeasurement) val.get(0))
 +            .collect(Collectors.toList());
 +  }
 +
 +  /**
 +   * Create a tuple that will contain the message, the entity name, and 
profile definition.
 +   * @param entity The entity name
 +   * @param message The telemetry message.
 +   * @param profile The profile definition.
 +   */
 +  private Tuple createTuple(String entity, JSONObject message, ProfileConfig 
profile, long timestamp) {
 +
 +    Tuple tuple = mock(Tuple.class);
 +    
when(tuple.getValueByField(eq(ProfileSplitterBolt.MESSAGE_TUPLE_FIELD))).thenReturn(message);
 +    
when(tuple.getValueByField(eq(ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD))).thenReturn(timestamp);
 +    
when(tuple.getValueByField(eq(ProfileSplitterBolt.ENTITY_TUPLE_FIELD))).thenReturn(entity);
 +    
when(tuple.getValueByField(eq(ProfileSplitterBolt.PROFILE_TUPLE_FIELD))).thenReturn(profile);
 +
 +    return tuple;
 +  }
 +
 +  /**
 +   * Create a ProfileBuilderBolt to test.
 +   * @return A {@link ProfileBuilderBolt} to test.
 +   */
 +  private ProfileBuilderBolt createBolt() throws IOException {
 +
 +    // defines the zk configurations accessible from the bolt
 +    ProfilerConfigurations configurations = new ProfilerConfigurations();
 +    configurations.updateGlobalConfig(Collections.emptyMap());
 +
 +    emitter = new HBaseEmitter();
 +    ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
 +            .withProfileTimeToLive(30, TimeUnit.MINUTES)
 +            .withMaxNumberOfRoutes(Long.MAX_VALUE)
 +            .withZookeeperClient(client)
 +            .withZookeeperCache(cache)
 +            .withEmitter(emitter)
 +            .withProfilerConfigurations(configurations)
 +            .withPeriodDuration(1, TimeUnit.MINUTES)
 +            .withTumblingWindow(new BaseWindowedBolt.Duration(30, 
TimeUnit.SECONDS));
 +    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
 +
 +    // set the flush signal AFTER calling 'prepare'
 +    bolt.withFlushSignal(flushSignal);
 +
 +    return bolt;
 +  }
 +
 +  /**
 +   * Creates a mock TupleWindow containing multiple tuples.
 +   * @param tuples The tuples to add to the window.
 +   */
 +  private TupleWindow createWindow(Tuple... tuples) {
 +
 +    TupleWindow window = mock(TupleWindow.class);
 +    when(window.get()).thenReturn(Arrays.asList(tuples));
 +    return window;
 +  }
 +
 +  /**
 +   * An implementation for testing purposes only.
 +   */
 +  private class TestEmitter implements ProfileMeasurementEmitter {
 +
 +    private String streamId;
 +
 +    public TestEmitter(String streamId) {
 +      this.streamId = streamId;
 +    }
 +
 +    @Override
 +    public String getStreamId() {
 +      return streamId;
 +    }
 +
 +    @Override
 +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
 +      declarer.declareStream(getStreamId(), new Fields("measurement"));
 +    }
 +
 +    @Override
 +    public void emit(ProfileMeasurement measurement, OutputCollector 
collector) {
 +      collector.emit(getStreamId(), new Values(measurement));
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --cc 
metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
index 182600a,0000000..4389d42
mode 100644,000000..100644
--- 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
@@@ -1,421 -1,0 +1,478 @@@
 +/*
 + *
 + *  Licensed to the Apache Software Foundation (ASF) under one
 + *  or more contributor license agreements.  See the NOTICE file
 + *  distributed with this work for additional information
 + *  regarding copyright ownership.  The ASF licenses this file
 + *  to you under the Apache License, Version 2.0 (the
 + *  "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + *
 + */
 +
 +package org.apache.metron.profiler.storm.integration;
 +
 +import org.adrianwalker.multilinestring.Multiline;
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.util.Bytes;
++import org.apache.commons.io.FileUtils;
 +import org.apache.metron.common.Constants;
- import org.apache.metron.common.utils.SerDeUtils;
 +import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 +import org.apache.metron.hbase.mock.MockHTable;
 +import org.apache.metron.integration.BaseIntegrationTest;
 +import org.apache.metron.integration.ComponentRunner;
 +import org.apache.metron.integration.UnableToStartException;
 +import org.apache.metron.integration.components.FluxTopologyComponent;
 +import org.apache.metron.integration.components.KafkaComponent;
 +import org.apache.metron.integration.components.ZKServerComponent;
- import org.apache.metron.profiler.ProfileMeasurement;
- import org.apache.metron.profiler.hbase.ColumnBuilder;
- import org.apache.metron.profiler.hbase.RowKeyBuilder;
- import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
- import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
++import org.apache.metron.profiler.client.stellar.FixedLookback;
++import org.apache.metron.profiler.client.stellar.GetProfile;
++import org.apache.metron.profiler.client.stellar.WindowLookback;
++import org.apache.metron.statistics.OnlineStatisticsProvider;
++import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
++import org.apache.metron.stellar.common.StellarStatefulExecutor;
++import org.apache.metron.stellar.dsl.Context;
++import 
org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 +import org.apache.storm.Config;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.UnsupportedEncodingException;
- import java.util.ArrayList;
++import java.lang.invoke.MethodHandles;
 +import java.util.Arrays;
++import java.util.Collections;
++import java.util.HashMap;
 +import java.util.List;
++import java.util.Map;
 +import java.util.Properties;
 +import java.util.concurrent.TimeUnit;
 +
 +import static com.google.code.tempusfugit.temporal.Duration.seconds;
 +import static com.google.code.tempusfugit.temporal.Timeout.timeout;
 +import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout;
- import static org.junit.Assert.assertArrayEquals;
++import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
++import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
++import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
++import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
++import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
++import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +/**
 + * An integration test of the Profiler topology.
 + */
 +public class ProfilerIntegrationTest extends BaseIntegrationTest {
 +
++  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 +  private static final String TEST_RESOURCES = 
"../../metron-analytics/metron-profiler-storm/src/test";
 +  private static final String FLUX_PATH = 
"src/main/flux/profiler/remote.yaml";
 +
 +  public static final long startAt = 10;
 +  public static final String entity = "10.0.0.1";
 +
 +  private static final String tableName = "profiler";
 +  private static final String columnFamily = "P";
 +  private static final String inputTopic = Constants.INDEXING_TOPIC;
 +  private static final String outputTopic = "profiles";
 +  private static final int saltDivisor = 10;
 +
-   private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1);
-   private static final long windowDurationMillis = 
TimeUnit.SECONDS.toMillis(5);
-   private static final long periodDurationMillis = 
TimeUnit.SECONDS.toMillis(10);
-   private static final long profileTimeToLiveMillis = 
TimeUnit.SECONDS.toMillis(15);
++  private static final long periodDurationMillis = 
TimeUnit.SECONDS.toMillis(20);
++  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(10);
++  private static final long windowDurationMillis = 
TimeUnit.SECONDS.toMillis(10);
++  private static final long profileTimeToLiveMillis = 
TimeUnit.SECONDS.toMillis(20);
++
 +  private static final long maxRoutesPerBolt = 100000;
 +
-   private static ColumnBuilder columnBuilder;
 +  private static ZKServerComponent zkComponent;
 +  private static FluxTopologyComponent fluxComponent;
 +  private static KafkaComponent kafkaComponent;
 +  private static ConfigUploadComponent configUploadComponent;
 +  private static ComponentRunner runner;
 +  private static MockHTable profilerTable;
 +
 +  private static String message1;
 +  private static String message2;
 +  private static String message3;
 +
++  private StellarStatefulExecutor executor;
++
 +  /**
 +   * [
 +   *    org.apache.metron.profiler.ProfileMeasurement,
 +   *    org.apache.metron.profiler.ProfilePeriod,
 +   *    org.apache.metron.common.configuration.profiler.ProfileResult,
 +   *    
org.apache.metron.common.configuration.profiler.ProfileResultExpressions,
 +   *    
org.apache.metron.common.configuration.profiler.ProfileTriageExpressions,
 +   *    org.apache.metron.common.configuration.profiler.ProfilerConfig,
 +   *    org.apache.metron.common.configuration.profiler.ProfileConfig,
 +   *    org.json.simple.JSONObject,
++   *    org.json.simple.JSONArray,
 +   *    java.util.LinkedHashMap,
 +   *    org.apache.metron.statistics.OnlineStatisticsProvider
 +   *  ]
 +   */
 +  @Multiline
 +  private static String kryoSerializers;
 +
-   /**
-    * The Profiler can generate profiles based on processing time.  With 
processing time,
-    * the Profiler builds profiles based on when the telemetry was processed.
-    *
-    * <p>Not defining a 'timestampField' within the Profiler configuration 
tells the Profiler
-    * to use processing time.
-    */
 +  @Test
 +  public void testProcessingTime() throws Exception {
++    uploadConfigToZookeeper(TEST_RESOURCES + 
"/config/zookeeper/processing-time-test");
 +
-     // upload the config to zookeeper
-     uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test");
- 
-     // start the topology and write test messages to kafka
++    // start the topology and write 3 test messages to kafka
 +    fluxComponent.submitTopology();
- 
-     // the messages that will be applied to the profile
 +    kafkaComponent.writeMessages(inputTopic, message1);
 +    kafkaComponent.writeMessages(inputTopic, message2);
 +    kafkaComponent.writeMessages(inputTopic, message3);
 +
-     // storm needs at least one message to close its event window
++    // retrieve the profile measurement using PROFILE_GET
++    String profileGetExpression = "PROFILE_GET('processing-time-test', 
'10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
++    List<Integer> measurements = execute(profileGetExpression, List.class);
++
++    // need to keep checking for measurements until the profiler has flushed 
one out
 +    int attempt = 0;
-     while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) {
++    while(measurements.size() == 0 && attempt++ < 10) {
 +
-       // sleep, at least beyond the current window
-       Thread.sleep(windowDurationMillis + windowLagMillis);
++      // wait for the profiler to flush
++      long sleep = windowDurationMillis;
++      LOG.debug("Waiting {} millis for profiler to flush", sleep);
++      Thread.sleep(sleep);
 +
-       // send another message to help close the current event window
++      // write another message to advance time. this ensures we are testing 
the 'normal' flush mechanism.
++      // if we do not send additional messages to advance time, then it is 
the profile TTL mechanism which
++      // will ultimately flush the profile
 +      kafkaComponent.writeMessages(inputTopic, message2);
++
++      // try again to retrieve the profile measurement using PROFILE_GET
++      measurements = execute(profileGetExpression, List.class);
 +    }
 +
-     // validate what was flushed
-     List<Integer> actuals = read(
-             profilerTable.getPutLog(),
-             columnFamily,
-             columnBuilder.getColumnQualifier("value"),
-             Integer.class);
-     assertEquals(1, actuals.size());
-     assertTrue(actuals.get(0) >= 3);
++    // expect to see only 1 measurement, but could be more (one for each 
period) depending on
++    // how long we waited for the flush to occur
++    assertTrue(measurements.size() > 0);
++
++    // the profile should have counted at least 3 messages; the 3 test 
messages that were sent.
++    // the count could be higher due to the test messages we sent to advance 
time.
++    assertTrue(measurements.get(0) >= 3);
 +  }
 +
-   /**
-    * The Profiler can generate profiles using event time.  With event time 
processing,
-    * the Profiler uses timestamps contained in the source telemetry.
-    *
-    * <p>Defining a 'timestampField' within the Profiler configuration tells 
the Profiler
-    * from which field the timestamp should be extracted.
-    */
 +  @Test
-   public void testEventTime() throws Exception {
- 
-     // upload the profiler config to zookeeper
-     uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test");
++  public void testProcessingTimeWithTimeToLiveFlush() throws Exception {
++    uploadConfigToZookeeper(TEST_RESOURCES + 
"/config/zookeeper/processing-time-test");
 +
-     // start the topology and write test messages to kafka
++    // start the topology and write 3 test messages to kafka
 +    fluxComponent.submitTopology();
 +    kafkaComponent.writeMessages(inputTopic, message1);
 +    kafkaComponent.writeMessages(inputTopic, message2);
 +    kafkaComponent.writeMessages(inputTopic, message3);
 +
-     // wait until the profile is flushed
-     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, 
timeout(seconds(90)));
++    // wait a bit beyond the window lag before writing another message.  this 
allows storm's window manager to close
++    // the event window, which then lets the profiler processes the previous 
messages.
++    long sleep = windowLagMillis + periodDurationMillis;
++    LOG.debug("Waiting {} millis before sending message to close window", 
sleep);
++    Thread.sleep(sleep);
++    kafkaComponent.writeMessages(inputTopic, message3);
++
++    // retrieve the profile measurement using PROFILE_GET
++    String profileGetExpression = "PROFILE_GET('processing-time-test', 
'10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
++    List<Integer> measurements = execute(profileGetExpression, List.class);
 +
-     List<Put> puts = profilerTable.getPutLog();
-     assertEquals(1, puts.size());
++    // need to keep checking for measurements until the profiler has flushed 
one out
++    int attempt = 0;
++    while(measurements.size() == 0 && attempt++ < 10) {
++
++      // wait for the profiler to flush
++      sleep = windowDurationMillis;
++      LOG.debug("Waiting {} millis for profiler to flush", sleep);
++      Thread.sleep(sleep);
++
++      // do not write additional messages to advance time. this ensures that 
we are testing the "time to live"
++      // flush mechanism. the TTL setting defines when the profile will be 
flushed
++
++      // try again to retrieve the profile measurement
++      measurements = execute(profileGetExpression, List.class);
++    }
++
++    // expect to see only 1 measurement, but could be more (one for each 
period) depending on
++    // how long we waited for the flush to occur
++    assertTrue(measurements.size() > 0);
 +
-     // inspect the row key to ensure the profiler used event time correctly.  
the timestamp
-     // embedded in the row key should match those in the source telemetry
-     byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, 
startAt);
-     byte[] actualRowKey = puts.get(0).getRow();
-     assertArrayEquals(failMessage(expectedRowKey, actualRowKey), 
expectedRowKey, actualRowKey);
++    // the profile should have counted 3 messages; the 3 test messages that 
were sent
++    assertEquals(3, measurements.get(0).intValue());
++  }
++
++  @Test
++  public void testEventTime() throws Exception {
++    uploadConfigToZookeeper(TEST_RESOURCES + 
"/config/zookeeper/event-time-test");
++
++    // start the topology and write test messages to kafka
++    fluxComponent.submitTopology();
++    List<String> messages = FileUtils.readLines(new 
File("src/test/resources/telemetry.json"));
++    kafkaComponent.writeMessages(inputTopic, messages);
++
++    long timestamp = System.currentTimeMillis();
++    LOG.debug("Attempting to close window period by sending message with 
timestamp = {}", timestamp);
++    kafkaComponent.writeMessages(inputTopic, getMessage("192.168.66.1", 
timestamp));
++    kafkaComponent.writeMessages(inputTopic, getMessage("192.168.138.158", 
timestamp));
++
++    // create the 'window' that looks up to 5 hours before the max timestamp 
contained in the test data
++    assign("maxTimestamp", "1530978728982L");
++    assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
++
++    // wait until the profile flushes both periods.  the first period will 
flush immediately as subsequent messages
++    // advance time.  the next period contains all of the remaining messages, 
so there are no other messages to
++    // advance time.  because of this the next period only flushes after the 
time-to-live expires
++    waitOrTimeout(() -> profilerTable.getPutLog().size() >= 6, 
timeout(seconds(90)));
++    {
++      // there are 14 messages in the first period and 12 in the next where 
ip_src_addr = 192.168.66.1
++      List results = execute("PROFILE_GET('count-by-ip', '192.168.66.1', 
window)", List.class);
++      assertEquals(14, results.get(0));
++      assertEquals(12, results.get(1));
++    }
++    {
++      // there are 36 messages in the first period and 38 in the next where 
ip_src_addr = 192.168.138.158
++      List results = execute("PROFILE_GET('count-by-ip', '192.168.138.158', 
window)", List.class);
++      assertEquals(36, results.get(0));
++      assertEquals(38, results.get(1));
++    }
++    {
++      // in all there are 50 messages in the first period and 50 messages in 
the next
++      List results = execute("PROFILE_GET('total-count', 'total', window)", 
List.class);
++      assertEquals(50, results.get(0));
++      assertEquals(50, results.get(1));
++    }
 +  }
 +
 +  /**
 +   * The result produced by a Profile has to be serializable within Storm. If 
the result is not
 +   * serializable the topology will crash and burn.
 +   *
 +   * This test ensures that if a profile returns a STATS object created using 
the STATS_INIT and
 +   * STATS_ADD functions, that it can be correctly serialized and persisted.
 +   */
 +  @Test
 +  public void testProfileWithStatsObject() throws Exception {
- 
-     // upload the profiler config to zookeeper
-     uploadConfig(TEST_RESOURCES + "/config/zookeeper/profile-with-stats");
++    uploadConfigToZookeeper(TEST_RESOURCES + 
"/config/zookeeper/profile-with-stats");
 +
 +    // start the topology and write test messages to kafka
 +    fluxComponent.submitTopology();
-     kafkaComponent.writeMessages(inputTopic, message1);
-     kafkaComponent.writeMessages(inputTopic, message2);
-     kafkaComponent.writeMessages(inputTopic, message3);
++    List<String> messages = FileUtils.readLines(new 
File("src/test/resources/telemetry.json"));
++    kafkaComponent.writeMessages(inputTopic, messages);
 +
 +    // wait until the profile is flushed
 +    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, 
timeout(seconds(90)));
 +
-     // ensure that a value was persisted in HBase
-     List<Put> puts = profilerTable.getPutLog();
-     assertEquals(1, puts.size());
- 
-     // generate the expected row key. only the profile name, entity, and 
period are used to generate the row key
-     ProfileMeasurement measurement = new ProfileMeasurement()
-             .withProfileName("profile-with-stats")
-             .withEntity("global")
-             .withPeriod(startAt, periodDurationMillis, TimeUnit.MILLISECONDS);
-     RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, 
periodDurationMillis, TimeUnit.MILLISECONDS);
-     byte[] expectedRowKey = rowKeyBuilder.rowKey(measurement);
- 
-     // ensure the correct row key was generated
-     byte[] actualRowKey = puts.get(0).getRow();
-     assertArrayEquals(failMessage(expectedRowKey, actualRowKey), 
expectedRowKey, actualRowKey);
++    // validate the measurements written by the batch profiler using 
`PROFILE_GET`
++    // the 'window' looks up to 5 hours before the max timestamp contained in 
the test data
++    assign("maxTimestamp", "1530978728982L");
++    assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
++
++    // retrieve the stats stored by the profiler
++    List results = execute("PROFILE_GET('profile-with-stats', 'global', 
window)", List.class);
++    assertTrue(results.size() > 0);
++    assertTrue(results.get(0) instanceof OnlineStatisticsProvider);
 +  }
 +
 +  /**
 +   * Generates an error message for if the byte comparison fails.
 +   *
 +   * @param expected The expected value.
 +   * @param actual The actual value.
 +   * @return
 +   * @throws UnsupportedEncodingException
 +   */
 +  private String failMessage(byte[] expected, byte[] actual) throws 
UnsupportedEncodingException {
 +    return String.format("expected '%s', got '%s'",
 +              new String(expected, "UTF-8"),
 +              new String(actual, "UTF-8"));
 +  }
 +
-   /**
-    * Generates the expected row key.
-    *
-    * @param profileName The name of the profile.
-    * @param entity The entity.
-    * @param whenMillis A timestamp in epoch milliseconds.
-    * @return A row key.
-    */
-   private byte[] generateExpectedRowKey(String profileName, String entity, 
long whenMillis) {
- 
-     // only the profile name, entity, and period are used to generate the row 
key
-     ProfileMeasurement measurement = new ProfileMeasurement()
-             .withProfileName(profileName)
-             .withEntity(entity)
-             .withPeriod(whenMillis, periodDurationMillis, 
TimeUnit.MILLISECONDS);
- 
-     // build the row key
-     RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, 
periodDurationMillis, TimeUnit.MILLISECONDS);
-     return rowKeyBuilder.rowKey(measurement);
-   }
- 
-   /**
-    * Reads a value written by the Profiler.
-    *
-    * @param family The column family.
-    * @param qualifier The column qualifier.
-    * @param clazz The expected type of the value.
-    * @param <T> The expected type of the value.
-    * @return The value written by the Profiler.
-    */
-   private <T> List<T> read(List<Put> puts, String family, byte[] qualifier, 
Class<T> clazz) {
-     List<T> results = new ArrayList<>();
- 
-     for(Put put: puts) {
-       List<Cell> cells = put.get(Bytes.toBytes(family), qualifier);
-       for(Cell cell : cells) {
-         T value = SerDeUtils.fromBytes(cell.getValue(), clazz);
-         results.add(value);
-       }
-     }
- 
-     return results;
++  private static String getMessage(String ipSource, long timestamp) {
++    return new MessageBuilder()
++            .withField("ip_src_addr", ipSource)
++            .withField("timestamp", timestamp)
++            .build()
++            .toJSONString();
 +  }
 +
 +  @BeforeClass
 +  public static void setupBeforeClass() throws UnableToStartException {
 +
 +    // create some messages that contain a timestamp - a really old 
timestamp; close to 1970
-     message1 = new MessageBuilder()
-             .withField("ip_src_addr", entity)
-             .withField("timestamp", startAt)
-             .build()
-             .toJSONString();
- 
-     message2 = new MessageBuilder()
-             .withField("ip_src_addr", entity)
-             .withField("timestamp", startAt + 100)
-             .build()
-             .toJSONString();
- 
-     message3 = new MessageBuilder()
-             .withField("ip_src_addr", entity)
-             .withField("timestamp", startAt + (windowDurationMillis * 2))
-             .build()
-             .toJSONString();
- 
-     columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
++    message1 = getMessage(entity, startAt);
++    message2 = getMessage(entity, startAt + 100);
++    message3 = getMessage(entity, startAt + (windowDurationMillis * 2));
 +
 +    // storm topology properties
 +    final Properties topologyProperties = new Properties() {{
 +
 +      // storm settings
 +      setProperty("profiler.workers", "1");
 +      setProperty("profiler.executors", "0");
++
 +      setProperty(Config.TOPOLOGY_AUTO_CREDENTIALS, "[]");
 +      setProperty(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "60");
 +      setProperty(Config.TOPOLOGY_MAX_SPOUT_PENDING, "100000");
 +
 +      // ensure tuples are serialized during the test, otherwise 
serialization problems
 +      // will not be found until the topology is run on a cluster with 
multiple workers
 +      setProperty(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, "true");
 +      setProperty(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, "false");
 +      setProperty(Config.TOPOLOGY_KRYO_REGISTER, kryoSerializers);
 +
 +      // kafka settings
 +      setProperty("profiler.input.topic", inputTopic);
 +      setProperty("profiler.output.topic", outputTopic);
-       setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
++      setProperty("kafka.start", "EARLIEST");
 +      setProperty("kafka.security.protocol", "PLAINTEXT");
 +
 +      // hbase settings
 +      setProperty("profiler.hbase.salt.divisor", 
Integer.toString(saltDivisor));
 +      setProperty("profiler.hbase.table", tableName);
 +      setProperty("profiler.hbase.column.family", columnFamily);
 +      setProperty("profiler.hbase.batch", "10");
 +      setProperty("profiler.hbase.flush.interval.seconds", "1");
 +      setProperty("hbase.provider.impl", "" + 
MockHBaseTableProvider.class.getName());
 +
 +      // profile settings
 +      setProperty("profiler.period.duration", 
Long.toString(periodDurationMillis));
 +      setProperty("profiler.period.duration.units", "MILLISECONDS");
 +      setProperty("profiler.ttl", Long.toString(profileTimeToLiveMillis));
 +      setProperty("profiler.ttl.units", "MILLISECONDS");
 +      setProperty("profiler.window.duration", 
Long.toString(windowDurationMillis));
 +      setProperty("profiler.window.duration.units", "MILLISECONDS");
 +      setProperty("profiler.window.lag", Long.toString(windowLagMillis));
 +      setProperty("profiler.window.lag.units", "MILLISECONDS");
 +      setProperty("profiler.max.routes.per.bolt", 
Long.toString(maxRoutesPerBolt));
 +    }};
 +
 +    // create the mock table
 +    profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, 
columnFamily);
 +
 +    zkComponent = getZKServerComponent(topologyProperties);
 +
 +    // create the input and output topics
 +    kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList(
 +            new KafkaComponent.Topic(inputTopic, 1),
 +            new KafkaComponent.Topic(outputTopic, 1)));
 +
 +    // upload profiler configuration to zookeeper
 +    configUploadComponent = new ConfigUploadComponent()
 +            .withTopologyProperties(topologyProperties);
 +
 +    // load flux definition for the profiler topology
 +    fluxComponent = new FluxTopologyComponent.Builder()
 +            .withTopologyLocation(new File(FLUX_PATH))
 +            .withTopologyName("profiler")
 +            .withTopologyProperties(topologyProperties)
 +            .build();
 +
 +    // start all components
 +    runner = new ComponentRunner.Builder()
 +            .withComponent("zk",zkComponent)
 +            .withComponent("kafka", kafkaComponent)
 +            .withComponent("config", configUploadComponent)
 +            .withComponent("storm", fluxComponent)
 +            .withMillisecondsBetweenAttempts(15000)
 +            .withNumRetries(10)
 +            .withCustomShutdownOrder(new String[] 
{"storm","config","kafka","zk"})
 +            .build();
 +    runner.start();
 +  }
 +
 +  @AfterClass
 +  public static void tearDownAfterClass() throws Exception {
 +    MockHBaseTableProvider.clear();
 +    if (runner != null) {
 +      runner.stop();
 +    }
 +  }
 +
 +  @Before
 +  public void setup() {
 +    // create the mock table
 +    profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, 
columnFamily);
++
++    // global properties
++    Map<String, Object> global = new HashMap<String, Object>() {{
++      put(PROFILER_HBASE_TABLE.getKey(), tableName);
++      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
++      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), 
MockHBaseTableProvider.class.getName());
++
++      // client needs to use the same period duration
++      put(PROFILER_PERIOD.getKey(), Long.toString(periodDurationMillis));
++      put(PROFILER_PERIOD_UNITS.getKey(), "MILLISECONDS");
++
++      // client needs to use the same salt divisor
++      put(PROFILER_SALT_DIVISOR.getKey(), saltDivisor);
++    }};
++
++    // create the stellar execution environment
++    executor = new DefaultStellarStatefulExecutor(
++            new SimpleFunctionResolver()
++                    .withClass(GetProfile.class)
++                    .withClass(FixedLookback.class)
++                    .withClass(WindowLookback.class),
++            new Context.Builder()
++                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
++                    .build());
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    MockHBaseTableProvider.clear();
 +    profilerTable.clear();
 +    if (runner != null) {
 +      runner.reset();
 +    }
 +  }
 +
 +  /**
 +   * Uploads config values to Zookeeper.
 +   * @param path The path on the local filesystem to the config values.
 +   * @throws Exception
 +   */
-   public void uploadConfig(String path) throws Exception {
++  public void uploadConfigToZookeeper(String path) throws Exception {
 +    configUploadComponent
 +            .withGlobalConfiguration(path)
 +            .withProfilerConfiguration(path)
 +            .update();
 +  }
++
++  /**
++   * Assign a value to the result of an expression.
++   *
++   * @param var The variable to assign.
++   * @param expression The expression to execute.
++   */
++  private void assign(String var, String expression) {
++    executor.assign(var, expression, Collections.emptyMap());
++  }
++
++  /**
++   * Execute a Stellar expression.
++   *
++   * @param expression The Stellar expression to execute.
++   * @param clazz
++   * @param <T>
++   * @return The result of executing the Stellar expression.
++   */
++  private <T> T execute(String expression, Class<T> clazz) {
++    T results = executor.execute(expression, Collections.emptyMap(), clazz);
++
++    LOG.debug("{} = {}", expression, results);
++    return results;
++  }
 +}

http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --cc 
metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties
index 541f368,0000000..1c2359a
mode 100644,000000..100644
--- a/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties
+++ b/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties
@@@ -1,34 -1,0 +1,32 @@@
 +#
 +#
 +#  Licensed to the Apache Software Foundation (ASF) under one
 +#  or more contributor license agreements.  See the NOTICE file
 +#  distributed with this work for additional information
 +#  regarding copyright ownership.  The ASF licenses this file
 +#  to you under the Apache License, Version 2.0 (the
 +#  "License"); you may not use this file except in compliance
 +#  with the License.  You may obtain a copy of the License at
 +#
 +#      http://www.apache.org/licenses/LICENSE-2.0
 +#
 +#  Unless required by applicable law or agreed to in writing, software
 +#  distributed under the License is distributed on an "AS IS" BASIS,
 +#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +#  See the License for the specific language governing permissions and
 +#  limitations under the License.
 +#
 +#
 +
 +# Root logger option
 +log4j.rootLogger=ERROR, stdout
 +
 +# Direct log messages to stdout
 +log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 +log4j.appender.stdout.Target=System.out
 +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{1}:%L - %m%n
- log4j.appender.stdout.filter.1=org.apache.log4j.varia.StringMatchFilter
- log4j.appender.stdout.filter.1.StringToMatch=Connection timed out
- log4j.appender.stdout.filter.1.AcceptOnMatch=false
- log4j.appender.stdout.filter.2=org.apache.log4j.varia.StringMatchFilter
- log4j.appender.stdout.filter.2.StringToMatch=Background
- log4j.appender.stdout.filter.2.AcceptOnMatch=false
++
++# uncomment below to help debug tests
++#log4j.logger.org.apache.metron.profiler=ALL
++#log4j.logger.org.apache.storm.windowing=ALL

Reply via email to