This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e270362  Add stringLast and stringFirst aggregators extension (#5789)
e270362 is described below

commit e2703627676366655001900091282c83d31315a4
Author: Andrés Gómez <andresgomez...@gmail.com>
AuthorDate: Wed Aug 1 19:52:54 2018 +0200

    Add stringLast and stringFirst aggregators extension (#5789)
    
    * Add lastString and firstString aggregators extension
    
    * Remove duplicated class
    
    * Move first-last-string doc page to extensions-contrib
    
    * Fix ObjectStrategy compare method
    
    * Fix doc bad aggregatos type name
    
    * Create FoldingAggregatorFactory classes to fix SegmentMetadataQuery
    
    * Add getMaxStringBytes() method to support JSON serialization
    
    * Fix null pointer exception at segment creation phase when the string 
value is null
    
    * Control the valueSelector object class on BufferAggregators
    
    * Perform all improvements
    
    * Add java doc on SerializablePairLongStringSerde
    
    * Refactor ObjectStraty compare method
    
    * Remove unused ;
    
    * Add aggregateCombiner unit tests. Rename BufferAggregators unit tests
    
    * Remove unused imports
    
    * Add license header
    
    * Add class name to java doc class serde
    
    * Throw exception if value is unsupported class type
    
    * Move first-last-string extension into druid core
    
    * Update druid core docs
    
    * Fix null pointer exception when pair->string is null
    
    * Add null control unit tests
    
    * Remove unused imports
    
    * Add first/last string folding aggregator on AggregatorsModule to support 
segment metadata query
    
    * Change SerializablePairLongString to extend SerializablePair
    
    * Change vars from public to private
    
    * Convert vars to primitive type
    
    * Clarify compare comment
    
    * Change IllegalStateException to ISE
    
    * Remove TODO comments
    
    * Control possible null pointer exception
    
    * Add @Nullable annotation
    
    * Remove empty line
    
    * Remove unused parameter type
    
    * Improve AggregatorCombiner javadocs
    
    * Add filterNullValues option at StringLast and StringFirst aggregators
    
    * Add filterNullValues option at agg documentation
    
    * Fix checkstyle
    
    * Update header license
    
    * Fix StringFirstAggregatorFactory.VALUE_COMPARATOR
    
    * Fix StringFirstAggregatorCombiner
    
    * Fix if condition at StringFirstAggregateCombiner
    
    * Remove filterNullValues from string first/last aggregators
    
    * Add isReset flag in FirstAggregatorCombiner
    
    * Change Arrays.asList to Collections.singletonList
---
 docs/content/querying/aggregations.md              |  32 +++-
 .../java/io/druid/jackson/AggregatorsModule.java   |  20 ++-
 .../io/druid/query/aggregation/AggregatorUtil.java |   4 +
 .../aggregation/SerializablePairLongString.java    |  35 ++++
 .../SerializablePairLongStringSerde.java           | 146 ++++++++++++++++
 .../first/LongFirstAggregatorFactory.java          |   2 +-
 .../first/StringFirstAggregateCombiner.java        |  60 +++++++
 .../aggregation/first/StringFirstAggregator.java   | 110 ++++++++++++
 ...tory.java => StringFirstAggregatorFactory.java} | 163 ++++++++---------
 .../first/StringFirstBufferAggregator.java         | 157 +++++++++++++++++
 .../first/StringFirstFoldingAggregatorFactory.java | 105 +++++++++++
 .../last/LongLastAggregatorFactory.java            |   2 +-
 .../last/StringLastAggregateCombiner.java          |  55 ++++++
 .../aggregation/last/StringLastAggregator.java     | 110 ++++++++++++
 ...ctory.java => StringLastAggregatorFactory.java} | 122 +++++--------
 .../last/StringLastBufferAggregator.java           | 157 +++++++++++++++++
 .../last/StringLastFoldingAggregatorFactory.java   | 102 +++++++++++
 .../first/StringFirstAggregationTest.java          | 194 +++++++++++++++++++++
 .../first/StringFirstBufferAggregatorTest.java     | 171 ++++++++++++++++++
 .../first/StringFirstTimeseriesQueryTest.java      | 123 +++++++++++++
 .../last/StringLastAggregationTest.java            | 194 +++++++++++++++++++++
 .../last/StringLastBufferAggregatorTest.java       | 171 ++++++++++++++++++
 .../last/StringLastTimeseriesQueryTest.java        | 126 +++++++++++++
 23 files changed, 2197 insertions(+), 164 deletions(-)

diff --git a/docs/content/querying/aggregations.md 
b/docs/content/querying/aggregations.md
index b0ce5cc..3f6b5e7 100644
--- a/docs/content/querying/aggregations.md
+++ b/docs/content/querying/aggregations.md
@@ -102,7 +102,7 @@ Computes and stores the sum of values as 32-bit floating 
point value. Similar to
 
 ### First / Last aggregator
 
-First and Last aggregator cannot be used in ingestion spec, and should only be 
specified as part of queries.
+(Double/Float/Long) First and Last aggregator cannot be used in ingestion 
spec, and should only be specified as part of queries.
 
 Note that queries with first/last aggregators on a segment created with rollup 
enabled will return the rolled up value, and not the last value within the raw 
ingested data.
 
@@ -178,6 +178,36 @@ Note that queries with first/last aggregators on a segment 
created with rollup e
 }
 ```
 
+#### `stringFirst` aggregator
+
+`stringFirst` computes the metric value with the minimum timestamp or `null` 
if no row exist
+
+```json
+{
+  "type" : "stringFirst",
+  "name" : <output_name>,
+  "fieldName" : <metric_name>,
+  "maxStringBytes" : <integer> # (optional, defaults to 1024),
+  "filterNullValues" : <boolean> # (optional, defaults to false)
+}
+```
+
+
+
+#### `stringLast` aggregator
+
+`stringLast` computes the metric value with the maximum timestamp or `null` if 
no row exist
+
+```json
+{
+  "type" : "stringLast",
+  "name" : <output_name>,
+  "fieldName" : <metric_name>,
+  "maxStringBytes" : <integer> # (optional, defaults to 1024),
+  "filterNullValues" : <boolean> # (optional, defaults to false)
+}
+```
+
 ### JavaScript aggregator
 
 Computes an arbitrary JavaScript function over a set of columns (both metrics 
and dimensions are allowed). Your
diff --git a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java 
b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
index 94deda0..d3cc9a7 100644
--- a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
@@ -38,10 +38,13 @@ import io.druid.query.aggregation.LongMaxAggregatorFactory;
 import io.druid.query.aggregation.LongMinAggregatorFactory;
 import io.druid.query.aggregation.LongSumAggregatorFactory;
 import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.aggregation.SerializablePairLongStringSerde;
 import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
 import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
 import io.druid.query.aggregation.first.FloatFirstAggregatorFactory;
 import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
+import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import io.druid.query.aggregation.first.StringFirstFoldingAggregatorFactory;
 import 
io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
 import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
@@ -49,6 +52,8 @@ import 
io.druid.query.aggregation.hyperloglog.PreComputedHyperUniquesSerde;
 import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
 import io.druid.query.aggregation.last.FloatLastAggregatorFactory;
 import io.druid.query.aggregation.last.LongLastAggregatorFactory;
+import io.druid.query.aggregation.last.StringLastAggregatorFactory;
+import io.druid.query.aggregation.last.StringLastFoldingAggregatorFactory;
 import io.druid.query.aggregation.post.ArithmeticPostAggregator;
 import io.druid.query.aggregation.post.ConstantPostAggregator;
 import io.druid.query.aggregation.post.DoubleGreatestPostAggregator;
@@ -74,7 +79,14 @@ public class AggregatorsModule extends SimpleModule
     }
 
     if (ComplexMetrics.getSerdeForType("preComputedHyperUnique") == null) {
-      ComplexMetrics.registerSerde("preComputedHyperUnique", new 
PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault()));
+      ComplexMetrics.registerSerde(
+          "preComputedHyperUnique",
+          new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault())
+      );
+    }
+
+    if (ComplexMetrics.getSerdeForType("serializablePairLongString") == null) {
+      ComplexMetrics.registerSerde("serializablePairLongString", new 
SerializablePairLongStringSerde());
     }
 
     setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
@@ -101,9 +113,13 @@ public class AggregatorsModule extends SimpleModule
       @JsonSubTypes.Type(name = "longFirst", value = 
LongFirstAggregatorFactory.class),
       @JsonSubTypes.Type(name = "doubleFirst", value = 
DoubleFirstAggregatorFactory.class),
       @JsonSubTypes.Type(name = "floatFirst", value = 
FloatFirstAggregatorFactory.class),
+      @JsonSubTypes.Type(name = "stringFirst", value = 
StringFirstAggregatorFactory.class),
+      @JsonSubTypes.Type(name = "stringFirstFold", value = 
StringFirstFoldingAggregatorFactory.class),
       @JsonSubTypes.Type(name = "longLast", value = 
LongLastAggregatorFactory.class),
       @JsonSubTypes.Type(name = "doubleLast", value = 
DoubleLastAggregatorFactory.class),
-      @JsonSubTypes.Type(name = "floatLast", value = 
FloatLastAggregatorFactory.class)
+      @JsonSubTypes.Type(name = "floatLast", value = 
FloatLastAggregatorFactory.class),
+      @JsonSubTypes.Type(name = "stringLast", value = 
StringLastAggregatorFactory.class),
+      @JsonSubTypes.Type(name = "stringLastFold", value = 
StringLastFoldingAggregatorFactory.class)
   })
   public interface AggregatorFactoryMixin
   {
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java 
b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
index 610e6ba..eedd095 100644
--- a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
+++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
@@ -94,6 +94,10 @@ public class AggregatorUtil
   public static final byte ARRAY_OF_DOUBLES_SKETCH_T_TEST_CACHE_TYPE_ID = 0x29;
   public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_STRING_CACHE_TYPE_ID = 
0x2A;
 
+  // StringFirst, StringLast aggregator
+  public static final byte STRING_FIRST_CACHE_TYPE_ID = 0x2B;
+  public static final byte STRING_LAST_CACHE_TYPE_ID = 0x2C;
+
   /**
    * returns the list of dependent postAggregators that should be calculated 
in order to calculate given postAgg
    *
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java
 
b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java
new file mode 100644
index 0000000..91f9b26
--- /dev/null
+++ 
b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.collections.SerializablePair;
+
+public class SerializablePairLongString extends SerializablePair<Long, String>
+{
+  @JsonCreator
+  public SerializablePairLongString(@JsonProperty("lhs") Long lhs, 
@JsonProperty("rhs") String rhs)
+  {
+    super(lhs, rhs);
+  }
+}
+
+
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java
 
b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java
new file mode 100644
index 0000000..ca245fa
--- /dev/null
+++ 
b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation;
+
+import io.druid.data.input.InputRow;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import io.druid.segment.GenericColumnSerializer;
+import io.druid.segment.column.ColumnBuilder;
+import io.druid.segment.data.GenericIndexed;
+import io.druid.segment.data.ObjectStrategy;
+import io.druid.segment.serde.ComplexColumnPartSupplier;
+import io.druid.segment.serde.ComplexMetricExtractor;
+import io.druid.segment.serde.ComplexMetricSerde;
+import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
+import io.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * The SerializablePairLongStringSerde serializes a Long-String pair 
(SerializablePairLongString).
+ * The serialization structure is: Long:Integer:String
+ * <p>
+ * The class is used on first/last String aggregators to store the time and 
the first/last string.
+ * Long:Integer:String -> Timestamp:StringSize:StringData
+ */
+public class SerializablePairLongStringSerde extends ComplexMetricSerde
+{
+
+  private static final String TYPE_NAME = "serializablePairLongString";
+
+  @Override
+  public String getTypeName()
+  {
+    return TYPE_NAME;
+  }
+
+  @Override
+  public ComplexMetricExtractor getExtractor()
+  {
+    return new ComplexMetricExtractor()
+    {
+      @Override
+      public Class<SerializablePairLongString> extractedClass()
+      {
+        return SerializablePairLongString.class;
+      }
+
+      @Override
+      public Object extractValue(InputRow inputRow, String metricName)
+      {
+        return inputRow.getRaw(metricName);
+      }
+    };
+  }
+
+  @Override
+  public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
+  {
+    final GenericIndexed column = GenericIndexed.read(buffer, 
getObjectStrategy(), columnBuilder.getFileMapper());
+    columnBuilder.setComplexColumn(new 
ComplexColumnPartSupplier(getTypeName(), column));
+  }
+
+  @Override
+  public ObjectStrategy getObjectStrategy()
+  {
+    return new ObjectStrategy<SerializablePairLongString>()
+    {
+      @Override
+      public int compare(@Nullable SerializablePairLongString o1, @Nullable 
SerializablePairLongString o2)
+      {
+        return StringFirstAggregatorFactory.VALUE_COMPARATOR.compare(o1, o2);
+      }
+
+      @Override
+      public Class<? extends SerializablePairLongString> getClazz()
+      {
+        return SerializablePairLongString.class;
+      }
+
+      @Override
+      public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int 
numBytes)
+      {
+        final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
+
+        long lhs = readOnlyBuffer.getLong();
+        int stringSize = readOnlyBuffer.getInt();
+
+        String lastString = null;
+        if (stringSize > 0) {
+          byte[] stringBytes = new byte[stringSize];
+          readOnlyBuffer.get(stringBytes, 0, stringSize);
+          lastString = StringUtils.fromUtf8(stringBytes);
+        }
+
+        return new SerializablePairLongString(lhs, lastString);
+      }
+
+      @Override
+      public byte[] toBytes(SerializablePairLongString val)
+      {
+        String rhsString = val.rhs;
+        ByteBuffer bbuf;
+
+        if (rhsString != null) {
+          byte[] rhsBytes = StringUtils.toUtf8(rhsString);
+          bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + 
rhsBytes.length);
+          bbuf.putLong(val.lhs);
+          bbuf.putInt(Long.BYTES, rhsBytes.length);
+          bbuf.position(Long.BYTES + Integer.BYTES);
+          bbuf.put(rhsBytes);
+        } else {
+          bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
+          bbuf.putLong(val.lhs);
+          bbuf.putInt(Long.BYTES, 0);
+        }
+
+        return bbuf.array();
+      }
+    };
+  }
+
+  @Override
+  public GenericColumnSerializer getSerializer(SegmentWriteOutMedium 
segmentWriteOutMedium, String column)
+  {
+    return 
LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, 
column, this.getObjectStrategy());
+  }
+}
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
 
b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
index 56d8aed..32b575f 100644
--- 
a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
+++ 
b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
@@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
-import io.druid.java.util.common.StringUtils;
 import io.druid.collections.SerializablePair;
+import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.UOE;
 import io.druid.query.aggregation.AggregateCombiner;
 import io.druid.query.aggregation.Aggregator;
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java
 
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java
new file mode 100644
index 0000000..20487f6
--- /dev/null
+++ 
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.query.aggregation.ObjectAggregateCombiner;
+import io.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class StringFirstAggregateCombiner extends 
ObjectAggregateCombiner<String>
+{
+  private String firstString;
+  private boolean isReset = false;
+
+  @Override
+  public void reset(ColumnValueSelector selector)
+  {
+    firstString = (String) selector.getObject();
+    isReset = true;
+  }
+
+  @Override
+  public void fold(ColumnValueSelector selector)
+  {
+    if (!isReset) {
+      firstString = (String) selector.getObject();
+      isReset = true;
+    }
+  }
+
+  @Nullable
+  @Override
+  public String getObject()
+  {
+    return firstString;
+  }
+
+  @Override
+  public Class<String> classOfObject()
+  {
+    return String.class;
+  }
+}
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java
 
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java
new file mode 100644
index 0000000..5710a61
--- /dev/null
+++ 
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.java.util.common.ISE;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+public class StringFirstAggregator implements Aggregator
+{
+
+  private final BaseObjectColumnValueSelector valueSelector;
+  private final BaseLongColumnValueSelector timeSelector;
+  private final int maxStringBytes;
+
+  protected long firstTime;
+  protected String firstValue;
+
+  public StringFirstAggregator(
+      BaseLongColumnValueSelector timeSelector,
+      BaseObjectColumnValueSelector valueSelector,
+      int maxStringBytes
+  )
+  {
+    this.valueSelector = valueSelector;
+    this.timeSelector = timeSelector;
+    this.maxStringBytes = maxStringBytes;
+
+    firstTime = Long.MAX_VALUE;
+    firstValue = null;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    long time = timeSelector.getLong();
+    if (time < firstTime) {
+      firstTime = time;
+      Object value = valueSelector.getObject();
+
+      if (value != null) {
+        if (value instanceof String) {
+          firstValue = (String) value;
+        } else if (value instanceof SerializablePairLongString) {
+          firstValue = ((SerializablePairLongString) value).rhs;
+        } else {
+          throw new ISE(
+              "Try to aggregate unsuported class type [%s].Supported class 
types: String or SerializablePairLongString",
+              value.getClass().getCanonicalName()
+          );
+        }
+
+        if (firstValue != null && firstValue.length() > maxStringBytes) {
+          firstValue = firstValue.substring(0, maxStringBytes);
+        }
+      } else {
+        firstValue = null;
+      }
+    }
+  }
+
+  @Override
+  public Object get()
+  {
+    return new SerializablePairLongString(firstTime, firstValue);
+  }
+
+  @Override
+  public float getFloat()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getFloat()");
+  }
+
+  @Override
+  public long getLong()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getLong()");
+  }
+
+  @Override
+  public double getDouble()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getDouble()");
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+}
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
 
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java
similarity index 52%
copy from 
processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
copy to 
processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java
index 56d8aed..187e891 100644
--- 
a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
+++ 
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java
@@ -21,66 +21,108 @@ package io.druid.query.aggregation.first;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
-import io.druid.java.util.common.StringUtils;
-import io.druid.collections.SerializablePair;
-import io.druid.java.util.common.UOE;
 import io.druid.query.aggregation.AggregateCombiner;
 import io.druid.query.aggregation.Aggregator;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.AggregatorUtil;
 import io.druid.query.aggregation.BufferAggregator;
-import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.cache.CacheKeyBuilder;
 import io.druid.segment.ColumnSelectorFactory;
 import io.druid.segment.column.Column;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
-public class LongFirstAggregatorFactory extends AggregatorFactory
+@JsonTypeName("stringFirst")
+public class StringFirstAggregatorFactory extends AggregatorFactory
 {
-  public static final Comparator VALUE_COMPARATOR = (o1, o2) -> Longs.compare(
-      ((SerializablePair<Long, Long>) o1).rhs,
-      ((SerializablePair<Long, Long>) o2).rhs
+  public static final int DEFAULT_MAX_STRING_SIZE = 1024;
+
+  public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare(
+      ((SerializablePairLongString) o1).lhs,
+      ((SerializablePairLongString) o2).lhs
   );
 
+  public static final Comparator<SerializablePairLongString> VALUE_COMPARATOR 
= (o1, o2) -> {
+    int comparation;
+
+    // First we check if the objects are null
+    if (o1 == null && o2 == null) {
+      comparation = 0;
+    } else if (o1 == null) {
+      comparation = -1;
+    } else if (o2 == null) {
+      comparation = 1;
+    } else {
+
+      // If the objects are not null, we will try to compare using timestamp
+      comparation = o1.lhs.compareTo(o2.lhs);
+
+      // If both timestamp are the same, we try to compare the Strings
+      if (comparation == 0) {
+
+        // First we check if the strings are null
+        if (o1.rhs == null && o2.rhs == null) {
+          comparation = 0;
+        } else if (o1.rhs == null) {
+          comparation = -1;
+        } else if (o2.rhs == null) {
+          comparation = 1;
+        } else {
+
+          // If the strings are not null, we will compare them
+          // Note: This comparation maybe doesn't make sense to first/last 
aggregators
+          comparation = o1.rhs.compareTo(o2.rhs);
+        }
+      }
+    }
+
+    return comparation;
+  };
+
   private final String fieldName;
   private final String name;
+  protected final int maxStringBytes;
 
   @JsonCreator
-  public LongFirstAggregatorFactory(
+  public StringFirstAggregatorFactory(
       @JsonProperty("name") String name,
-      @JsonProperty("fieldName") final String fieldName
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("maxStringBytes") Integer maxStringBytes
   )
   {
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator 
name");
     Preconditions.checkNotNull(fieldName, "Must have a valid, non-null 
fieldName");
-
     this.name = name;
     this.fieldName = fieldName;
+    this.maxStringBytes = maxStringBytes == null ? DEFAULT_MAX_STRING_SIZE : 
maxStringBytes;
   }
 
   @Override
   public Aggregator factorize(ColumnSelectorFactory metricFactory)
   {
-    return new LongFirstAggregator(
+    return new StringFirstAggregator(
         metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
-        metricFactory.makeColumnValueSelector(fieldName)
+        metricFactory.makeColumnValueSelector(fieldName),
+        maxStringBytes
     );
   }
 
   @Override
   public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
   {
-    return new LongFirstBufferAggregator(
+    return new StringFirstBufferAggregator(
         metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
-        metricFactory.makeColumnValueSelector(fieldName)
+        metricFactory.makeColumnValueSelector(fieldName),
+        maxStringBytes
     );
   }
 
@@ -93,82 +135,38 @@ public class LongFirstAggregatorFactory extends 
AggregatorFactory
   @Override
   public Object combine(Object lhs, Object rhs)
   {
-    return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) <= 0 
? lhs : rhs;
+    return TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs;
   }
 
   @Override
   public AggregateCombiner makeAggregateCombiner()
   {
-    throw new UOE("LongFirstAggregatorFactory is not supported during 
ingestion for rollup");
+    return new StringFirstAggregateCombiner();
   }
 
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new LongFirstAggregatorFactory(name, name)
-    {
-      @Override
-      public Aggregator factorize(ColumnSelectorFactory metricFactory)
-      {
-        final BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(name);
-        return new LongFirstAggregator(null, null)
-        {
-          @Override
-          public void aggregate()
-          {
-            SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) 
selector.getObject();
-            if (pair.lhs < firstTime) {
-              firstTime = pair.lhs;
-              firstValue = pair.rhs;
-            }
-          }
-        };
-      }
-
-      @Override
-      public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
-      {
-        final BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(name);
-        return new LongFirstBufferAggregator(null, null)
-        {
-          @Override
-          public void aggregate(ByteBuffer buf, int position)
-          {
-            SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) 
selector.getObject();
-            long firstTime = buf.getLong(position);
-            if (pair.lhs < firstTime) {
-              buf.putLong(position, pair.lhs);
-              buf.putLong(position + Long.BYTES, pair.rhs);
-            }
-          }
-
-          @Override
-          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-          {
-            inspector.visit("selector", selector);
-          }
-        };
-      }
-    };
+    return new StringFirstFoldingAggregatorFactory(name, name, maxStringBytes);
   }
 
   @Override
   public List<AggregatorFactory> getRequiredColumns()
   {
-    return Collections.singletonList(new LongFirstAggregatorFactory(fieldName, 
fieldName));
+    return Collections.singletonList(new 
StringFirstAggregatorFactory(fieldName, fieldName, maxStringBytes));
   }
 
   @Override
   public Object deserialize(Object object)
   {
     Map map = (Map) object;
-    return new SerializablePair<>(((Number) map.get("lhs")).longValue(), 
((Number) map.get("rhs")).longValue());
+    return new SerializablePairLongString(((Number) 
map.get("lhs")).longValue(), ((String) map.get("rhs")));
   }
 
   @Override
   public Object finalizeComputation(Object object)
   {
-    return ((SerializablePair<Long, Long>) object).rhs;
+    return ((SerializablePairLongString) object).rhs;
   }
 
   @Override
@@ -184,6 +182,12 @@ public class LongFirstAggregatorFactory extends 
AggregatorFactory
     return fieldName;
   }
 
+  @JsonProperty
+  public Integer getMaxStringBytes()
+  {
+    return maxStringBytes;
+  }
+
   @Override
   public List<String> requiredFields()
   {
@@ -193,24 +197,22 @@ public class LongFirstAggregatorFactory extends 
AggregatorFactory
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
-    return ByteBuffer.allocate(1 + fieldNameBytes.length)
-                     .put(AggregatorUtil.LONG_FIRST_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .array();
+    return new CacheKeyBuilder(AggregatorUtil.STRING_FIRST_CACHE_TYPE_ID)
+        .appendString(fieldName)
+        .appendInt(maxStringBytes)
+        .build();
   }
 
   @Override
   public String getTypeName()
   {
-    return "long";
+    return "serializablePairLongString";
   }
 
   @Override
   public int getMaxIntermediateSize()
   {
-    return Long.BYTES * 2;
+    return Long.BYTES + Integer.BYTES + maxStringBytes;
   }
 
   @Override
@@ -223,25 +225,24 @@ public class LongFirstAggregatorFactory extends 
AggregatorFactory
       return false;
     }
 
-    LongFirstAggregatorFactory that = (LongFirstAggregatorFactory) o;
+    StringFirstAggregatorFactory that = (StringFirstAggregatorFactory) o;
 
-    return fieldName.equals(that.fieldName) && name.equals(that.name);
+    return fieldName.equals(that.fieldName) && name.equals(that.name) && 
maxStringBytes == that.maxStringBytes;
   }
 
   @Override
   public int hashCode()
   {
-    int result = name.hashCode();
-    result = 31 * result + fieldName.hashCode();
-    return result;
+    return Objects.hash(name, fieldName, maxStringBytes);
   }
 
   @Override
   public String toString()
   {
-    return "LongFirstAggregatorFactory{" +
+    return "StringFirstAggregatorFactory{" +
            "name='" + name + '\'' +
            ", fieldName='" + fieldName + '\'' +
+           ", maxStringBytes=" + maxStringBytes + '\'' +
            '}';
   }
 }
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java
 
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java
new file mode 100644
index 0000000..c71cfbf
--- /dev/null
+++ 
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+import java.nio.ByteBuffer;
+
+public class StringFirstBufferAggregator implements BufferAggregator
+{
+  private final BaseLongColumnValueSelector timeSelector;
+  private final BaseObjectColumnValueSelector valueSelector;
+  private final int maxStringBytes;
+
+  public StringFirstBufferAggregator(
+      BaseLongColumnValueSelector timeSelector,
+      BaseObjectColumnValueSelector valueSelector,
+      int maxStringBytes
+  )
+  {
+    this.timeSelector = timeSelector;
+    this.valueSelector = valueSelector;
+    this.maxStringBytes = maxStringBytes;
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.putLong(position, Long.MAX_VALUE);
+    buf.putInt(position + Long.BYTES, 0);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    Object value = valueSelector.getObject();
+
+    long time = timeSelector.getLong();
+    String firstString = null;
+
+    if (value != null) {
+      if (value instanceof SerializablePairLongString) {
+        SerializablePairLongString serializablePair = 
(SerializablePairLongString) value;
+        time = serializablePair.lhs;
+        firstString = serializablePair.rhs;
+      } else if (value instanceof String) {
+        firstString = (String) value;
+      } else {
+        throw new ISE(
+            "Try to aggregate unsuported class type [%s].Supported class 
types: String or SerializablePairLongString",
+            value.getClass().getCanonicalName()
+        );
+      }
+    }
+
+    long lastTime = mutationBuffer.getLong(position);
+
+    if (time < lastTime) {
+      if (firstString != null) {
+        if (firstString.length() > maxStringBytes) {
+          firstString = firstString.substring(0, maxStringBytes);
+        }
+
+        byte[] valueBytes = StringUtils.toUtf8(firstString);
+
+        mutationBuffer.putLong(position, time);
+        mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+
+        mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+        mutationBuffer.put(valueBytes);
+      } else {
+        mutationBuffer.putLong(position, time);
+        mutationBuffer.putInt(position + Long.BYTES, 0);
+      }
+    }
+  }
+
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    Long timeValue = mutationBuffer.getLong(position);
+    int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES);
+
+    SerializablePairLongString serializablePair;
+
+    if (stringSizeBytes > 0) {
+      byte[] valueBytes = new byte[stringSizeBytes];
+      mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+      mutationBuffer.get(valueBytes, 0, stringSizeBytes);
+      serializablePair = new SerializablePairLongString(timeValue, 
StringUtils.fromUtf8(valueBytes));
+    } else {
+      serializablePair = new SerializablePairLongString(timeValue, null);
+    }
+
+    return serializablePair;
+  }
+
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getFloat()");
+  }
+
+  @Override
+  public long getLong(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getLong()");
+  }
+
+  @Override
+  public double getDouble(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getDouble()");
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("timeSelector", timeSelector);
+    inspector.visit("valueSelector", valueSelector);
+  }
+}
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
 
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
new file mode 100644
index 0000000..b268baf
--- /dev/null
+++ 
b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.segment.ColumnSelectorFactory;
+
+import java.nio.ByteBuffer;
+
+@JsonTypeName("stringFirstFold")
+public class StringFirstFoldingAggregatorFactory extends 
StringFirstAggregatorFactory
+{
+  public StringFirstFoldingAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("maxStringBytes") Integer maxStringBytes
+  )
+  {
+    super(name, fieldName, maxStringBytes);
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    final BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(getName());
+    return new StringFirstAggregator(null, null, maxStringBytes)
+    {
+      @Override
+      public void aggregate()
+      {
+        SerializablePairLongString pair = (SerializablePairLongString) 
selector.getObject();
+        if (pair != null && pair.lhs < firstTime) {
+          firstTime = pair.lhs;
+          firstValue = pair.rhs;
+        }
+      }
+    };
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
+  {
+    final BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(getName());
+    return new StringFirstBufferAggregator(null, null, maxStringBytes)
+    {
+      @Override
+      public void aggregate(ByteBuffer buf, int position)
+      {
+        SerializablePairLongString pair = (SerializablePairLongString) 
selector.getObject();
+
+        if (pair != null && pair.lhs != null) {
+          ByteBuffer mutationBuffer = buf.duplicate();
+          mutationBuffer.position(position);
+
+          long lastTime = mutationBuffer.getLong(position);
+
+          if (pair.lhs < lastTime) {
+            mutationBuffer.putLong(position, pair.lhs);
+
+            if (pair.rhs != null) {
+              byte[] valueBytes = StringUtils.toUtf8(pair.rhs);
+
+              mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+              mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+              mutationBuffer.put(valueBytes);
+            } else {
+              mutationBuffer.putInt(position + Long.BYTES, 0);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      {
+        inspector.visit("selector", selector);
+      }
+    };
+  }
+
+}
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
 
b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
index ff33195..dc186a9 100644
--- 
a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
+++ 
b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
@@ -22,8 +22,8 @@ package io.druid.query.aggregation.last;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import io.druid.java.util.common.StringUtils;
 import io.druid.collections.SerializablePair;
+import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.UOE;
 import io.druid.query.aggregation.AggregateCombiner;
 import io.druid.query.aggregation.Aggregator;
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java
 
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java
new file mode 100644
index 0000000..6625f08
--- /dev/null
+++ 
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.query.aggregation.ObjectAggregateCombiner;
+import io.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class StringLastAggregateCombiner extends 
ObjectAggregateCombiner<String>
+{
+  private String lastString;
+
+  @Override
+  public void reset(ColumnValueSelector selector)
+  {
+    lastString = (String) selector.getObject();
+  }
+
+  @Override
+  public void fold(ColumnValueSelector selector)
+  {
+    lastString = (String) selector.getObject();
+  }
+
+  @Nullable
+  @Override
+  public String getObject()
+  {
+    return lastString;
+  }
+
+  @Override
+  public Class<String> classOfObject()
+  {
+    return String.class;
+  }
+}
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java
 
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java
new file mode 100644
index 0000000..85cd0dd
--- /dev/null
+++ 
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.java.util.common.ISE;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+public class StringLastAggregator implements Aggregator
+{
+
+  private final BaseObjectColumnValueSelector valueSelector;
+  private final BaseLongColumnValueSelector timeSelector;
+  private final int maxStringBytes;
+
+  protected long lastTime;
+  protected String lastValue;
+
+  public StringLastAggregator(
+      BaseLongColumnValueSelector timeSelector,
+      BaseObjectColumnValueSelector valueSelector,
+      int maxStringBytes
+  )
+  {
+    this.valueSelector = valueSelector;
+    this.timeSelector = timeSelector;
+    this.maxStringBytes = maxStringBytes;
+
+    lastTime = Long.MIN_VALUE;
+    lastValue = null;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    long time = timeSelector.getLong();
+    if (time >= lastTime) {
+      lastTime = time;
+      Object value = valueSelector.getObject();
+
+      if (value != null) {
+        if (value instanceof String) {
+          lastValue = (String) value;
+        } else if (value instanceof SerializablePairLongString) {
+          lastValue = ((SerializablePairLongString) value).rhs;
+        } else {
+          throw new ISE(
+              "Try to aggregate unsuported class type [%s].Supported class 
types: String or SerializablePairLongString",
+              value.getClass().getCanonicalName()
+          );
+        }
+
+        if (lastValue != null && lastValue.length() > maxStringBytes) {
+          lastValue = lastValue.substring(0, maxStringBytes);
+        }
+      } else {
+        lastValue = null;
+      }
+    }
+  }
+
+  @Override
+  public Object get()
+  {
+    return new SerializablePairLongString(lastTime, lastValue);
+  }
+
+  @Override
+  public float getFloat()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getFloat()");
+  }
+
+  @Override
+  public long getLong()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getLong()");
+  }
+
+  @Override
+  public double getDouble()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getDouble()");
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+}
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
 
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java
similarity index 52%
copy from 
processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
copy to 
processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java
index ff33195..cb4f363 100644
--- 
a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
+++ 
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java
@@ -21,23 +21,19 @@ package io.druid.query.aggregation.last;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
-import io.druid.java.util.common.StringUtils;
-import io.druid.collections.SerializablePair;
-import io.druid.java.util.common.UOE;
 import io.druid.query.aggregation.AggregateCombiner;
 import io.druid.query.aggregation.Aggregator;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.AggregatorUtil;
 import io.druid.query.aggregation.BufferAggregator;
-import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
-import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
-import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import io.druid.query.cache.CacheKeyBuilder;
 import io.druid.segment.ColumnSelectorFactory;
 import io.druid.segment.column.Column;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -45,126 +41,91 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class LongLastAggregatorFactory extends AggregatorFactory
+
+@JsonTypeName("stringLast")
+public class StringLastAggregatorFactory extends AggregatorFactory
 {
   private final String fieldName;
   private final String name;
+  protected final int maxStringBytes;
 
   @JsonCreator
-  public LongLastAggregatorFactory(
+  public StringLastAggregatorFactory(
       @JsonProperty("name") String name,
-      @JsonProperty("fieldName") final String fieldName
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("maxStringBytes") Integer maxStringBytes
   )
   {
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator 
name");
     Preconditions.checkNotNull(fieldName, "Must have a valid, non-null 
fieldName");
     this.name = name;
     this.fieldName = fieldName;
+    this.maxStringBytes = maxStringBytes == null
+                          ? 
StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE
+                          : maxStringBytes;
   }
 
   @Override
   public Aggregator factorize(ColumnSelectorFactory metricFactory)
   {
-    return new LongLastAggregator(
+    return new StringLastAggregator(
         metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
-        metricFactory.makeColumnValueSelector(fieldName)
+        metricFactory.makeColumnValueSelector(fieldName),
+        maxStringBytes
     );
   }
 
   @Override
   public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
   {
-    return new LongLastBufferAggregator(
+    return new StringLastBufferAggregator(
         metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
-        metricFactory.makeColumnValueSelector(fieldName)
+        metricFactory.makeColumnValueSelector(fieldName),
+        maxStringBytes
     );
   }
 
   @Override
   public Comparator getComparator()
   {
-    return LongFirstAggregatorFactory.VALUE_COMPARATOR;
+    return StringFirstAggregatorFactory.VALUE_COMPARATOR;
   }
 
   @Override
   public Object combine(Object lhs, Object rhs)
   {
-    return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 
? lhs : rhs;
+    return StringFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 
? lhs : rhs;
   }
 
   @Override
   public AggregateCombiner makeAggregateCombiner()
   {
-    throw new UOE("LongLastAggregatorFactory is not supported during ingestion 
for rollup");
+    return new StringLastAggregateCombiner();
   }
 
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new LongLastAggregatorFactory(name, name)
-    {
-      @Override
-      public Aggregator factorize(ColumnSelectorFactory metricFactory)
-      {
-        final BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(name);
-        return new LongLastAggregator(null, null)
-        {
-          @Override
-          public void aggregate()
-          {
-            SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) 
selector.getObject();
-            if (pair.lhs >= lastTime) {
-              lastTime = pair.lhs;
-              lastValue = pair.rhs;
-            }
-          }
-        };
-      }
-
-      @Override
-      public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
-      {
-        final BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(name);
-        return new LongLastBufferAggregator(null, null)
-        {
-          @Override
-          public void aggregate(ByteBuffer buf, int position)
-          {
-            SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) 
selector.getObject();
-            long lastTime = buf.getLong(position);
-            if (pair.lhs >= lastTime) {
-              buf.putLong(position, pair.lhs);
-              buf.putLong(position + Long.BYTES, pair.rhs);
-            }
-          }
-
-          @Override
-          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-          {
-            inspector.visit("selector", selector);
-          }
-        };
-      }
-    };
+    return new StringLastFoldingAggregatorFactory(name, name, maxStringBytes);
   }
 
   @Override
   public List<AggregatorFactory> getRequiredColumns()
   {
-    return Collections.singletonList(new LongLastAggregatorFactory(fieldName, 
fieldName));
+    return Collections.singletonList(new 
StringLastAggregatorFactory(fieldName, fieldName, maxStringBytes));
   }
 
   @Override
   public Object deserialize(Object object)
   {
     Map map = (Map) object;
-    return new SerializablePair<>(((Number) map.get("lhs")).longValue(), 
((Number) map.get("rhs")).longValue());
+    return new SerializablePairLongString(((Number) 
map.get("lhs")).longValue(), ((String) map.get("rhs")));
   }
 
   @Override
   public Object finalizeComputation(Object object)
   {
-    return ((SerializablePair<Long, Long>) object).rhs;
+    return ((SerializablePairLongString) object).rhs;
   }
 
   @Override
@@ -180,6 +141,12 @@ public class LongLastAggregatorFactory extends 
AggregatorFactory
     return fieldName;
   }
 
+  @JsonProperty
+  public Integer getMaxStringBytes()
+  {
+    return maxStringBytes;
+  }
+
   @Override
   public List<String> requiredFields()
   {
@@ -189,24 +156,22 @@ public class LongLastAggregatorFactory extends 
AggregatorFactory
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
-    return ByteBuffer.allocate(1 + fieldNameBytes.length)
-                     .put(AggregatorUtil.LONG_LAST_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .array();
+    return new CacheKeyBuilder(AggregatorUtil.STRING_LAST_CACHE_TYPE_ID)
+        .appendString(fieldName)
+        .appendInt(maxStringBytes)
+        .build();
   }
 
   @Override
   public String getTypeName()
   {
-    return "long";
+    return "serializablePairLongString";
   }
 
   @Override
   public int getMaxIntermediateSize()
   {
-    return Long.BYTES * 2;
+    return Long.BYTES + Integer.BYTES + maxStringBytes;
   }
 
   @Override
@@ -219,23 +184,24 @@ public class LongLastAggregatorFactory extends 
AggregatorFactory
       return false;
     }
 
-    LongLastAggregatorFactory that = (LongLastAggregatorFactory) o;
+    StringLastAggregatorFactory that = (StringLastAggregatorFactory) o;
 
-    return name.equals(that.name) && fieldName.equals(that.fieldName);
+    return fieldName.equals(that.fieldName) && name.equals(that.name) && 
maxStringBytes == that.maxStringBytes;
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(name, fieldName);
+    return Objects.hash(name, fieldName, maxStringBytes);
   }
 
   @Override
   public String toString()
   {
-    return "LongLastAggregatorFactory{" +
+    return "StringFirstAggregatorFactory{" +
            "name='" + name + '\'' +
            ", fieldName='" + fieldName + '\'' +
+           ", maxStringBytes=" + maxStringBytes + '\'' +
            '}';
   }
 }
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java
 
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java
new file mode 100644
index 0000000..12c9948
--- /dev/null
+++ 
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+import java.nio.ByteBuffer;
+
+public class StringLastBufferAggregator implements BufferAggregator
+{
+  private final BaseLongColumnValueSelector timeSelector;
+  private final BaseObjectColumnValueSelector valueSelector;
+  private final int maxStringBytes;
+
+  public StringLastBufferAggregator(
+      BaseLongColumnValueSelector timeSelector,
+      BaseObjectColumnValueSelector valueSelector,
+      int maxStringBytes
+  )
+  {
+    this.timeSelector = timeSelector;
+    this.valueSelector = valueSelector;
+    this.maxStringBytes = maxStringBytes;
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.putLong(position, Long.MIN_VALUE);
+    buf.putInt(position + Long.BYTES, 0);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    Object value = valueSelector.getObject();
+
+    long time = timeSelector.getLong();
+    String lastString = null;
+
+    if (value != null) {
+      if (value instanceof SerializablePairLongString) {
+        SerializablePairLongString serializablePair = 
(SerializablePairLongString) value;
+        time = serializablePair.lhs;
+        lastString = serializablePair.rhs;
+      } else if (value instanceof String) {
+        lastString = (String) value;
+      } else {
+        throw new ISE(
+            "Try to aggregate unsuported class type [%s].Supported class 
types: String or SerializablePairLongString",
+            value.getClass().getCanonicalName()
+        );
+      }
+    }
+
+    long lastTime = mutationBuffer.getLong(position);
+
+    if (time >= lastTime) {
+      if (lastString != null) {
+        if (lastString.length() > maxStringBytes) {
+          lastString = lastString.substring(0, maxStringBytes);
+        }
+
+        byte[] valueBytes = StringUtils.toUtf8(lastString);
+
+        mutationBuffer.putLong(position, time);
+        mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+
+        mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+        mutationBuffer.put(valueBytes);
+      } else {
+        mutationBuffer.putLong(position, time);
+        mutationBuffer.putInt(position + Long.BYTES, 0);
+      }
+    }
+  }
+
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    Long timeValue = mutationBuffer.getLong(position);
+    Integer stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES);
+
+    SerializablePairLongString serializablePair;
+
+    if (stringSizeBytes > 0) {
+      byte[] valueBytes = new byte[stringSizeBytes];
+      mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+      mutationBuffer.get(valueBytes, 0, stringSizeBytes);
+      serializablePair = new SerializablePairLongString(timeValue, 
StringUtils.fromUtf8(valueBytes));
+    } else {
+      serializablePair = new SerializablePairLongString(timeValue, null);
+    }
+
+    return serializablePair;
+  }
+
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getFloat()");
+  }
+
+  @Override
+  public long getLong(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getLong()");
+  }
+
+  @Override
+  public double getDouble(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not 
support getDouble()");
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("timeSelector", timeSelector);
+    inspector.visit("valueSelector", valueSelector);
+  }
+}
diff --git 
a/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
 
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
new file mode 100644
index 0000000..9bd6a64
--- /dev/null
+++ 
b/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.segment.ColumnSelectorFactory;
+
+import java.nio.ByteBuffer;
+
+@JsonTypeName("stringLastFold")
+public class StringLastFoldingAggregatorFactory extends 
StringLastAggregatorFactory
+{
+  public StringLastFoldingAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("maxStringBytes") Integer maxStringBytes
+  )
+  {
+    super(name, fieldName, maxStringBytes);
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    final BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(getName());
+    return new StringLastAggregator(null, null, maxStringBytes)
+    {
+      @Override
+      public void aggregate()
+      {
+        SerializablePairLongString pair = (SerializablePairLongString) 
selector.getObject();
+        if (pair != null && pair.lhs >= lastTime) {
+          lastTime = pair.lhs;
+          lastValue = pair.rhs;
+        }
+      }
+    };
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
+  {
+    final BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(getName());
+    return new StringLastBufferAggregator(null, null, maxStringBytes)
+    {
+      @Override
+      public void aggregate(ByteBuffer buf, int position)
+      {
+        SerializablePairLongString pair = (SerializablePairLongString) 
selector.getObject();
+        if (pair != null && pair.lhs != null) {
+          ByteBuffer mutationBuffer = buf.duplicate();
+          mutationBuffer.position(position);
+
+          long lastTime = mutationBuffer.getLong(position);
+
+          if (pair.lhs >= lastTime) {
+            mutationBuffer.putLong(position, pair.lhs);
+            if (pair.rhs != null) {
+              byte[] valueBytes = StringUtils.toUtf8(pair.rhs);
+
+              mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+              mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+              mutationBuffer.put(valueBytes);
+            } else {
+              mutationBuffer.putInt(position + Long.BYTES, 0);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      {
+        inspector.visit("selector", selector);
+      }
+    };
+  }
+}
diff --git 
a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java
 
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java
new file mode 100644
index 0000000..8f523c0
--- /dev/null
+++ 
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.java.util.common.Pair;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.column.Column;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StringFirstAggregationTest
+{
+  private final Integer MAX_STRING_SIZE = 1024;
+  private StringFirstAggregatorFactory stringLastAggFactory;
+  private StringFirstAggregatorFactory combiningAggFactory;
+  private ColumnSelectorFactory colSelectorFactory;
+  private TestLongColumnSelector timeSelector;
+  private TestObjectColumnSelector<String> valueSelector;
+  private TestObjectColumnSelector objectSelector;
+
+  private String[] strings = {"1111", "2222", "3333", null, "4444"};
+  private long[] times = {8224, 6879, 2436, 3546, 7888};
+  private SerializablePairLongString[] pairs = {
+      new SerializablePairLongString(52782L, "AAAA"),
+      new SerializablePairLongString(65492L, "BBBB"),
+      new SerializablePairLongString(69134L, "CCCC"),
+      new SerializablePairLongString(11111L, "DDDD"),
+      new SerializablePairLongString(51223L, null)
+  };
+
+  @Before
+  public void setup()
+  {
+    stringLastAggFactory = new StringFirstAggregatorFactory("billy", "nilly", 
MAX_STRING_SIZE);
+    combiningAggFactory = (StringFirstAggregatorFactory) 
stringLastAggFactory.getCombiningFactory();
+    timeSelector = new TestLongColumnSelector(times);
+    valueSelector = new TestObjectColumnSelector<>(strings);
+    objectSelector = new TestObjectColumnSelector<>(pairs);
+    colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
+    
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
+    
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+    EasyMock.replay(colSelectorFactory);
+  }
+
+  @Test
+  public void testStringLastAggregator()
+  {
+    StringFirstAggregator agg = (StringFirstAggregator) 
stringLastAggFactory.factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get();
+
+    Assert.assertEquals(strings[2], result.rhs);
+  }
+
+  @Test
+  public void testStringLastBufferAggregator()
+  {
+    StringFirstBufferAggregator agg = (StringFirstBufferAggregator) 
stringLastAggFactory.factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new 
byte[stringLastAggFactory.getMaxIntermediateSize()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+
+    Assert.assertEquals(strings[2], result.rhs);
+  }
+
+  @Test
+  public void testCombine()
+  {
+    SerializablePairLongString pair1 = new 
SerializablePairLongString(1467225000L, "AAAA");
+    SerializablePairLongString pair2 = new 
SerializablePairLongString(1467240000L, "BBBB");
+    Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2));
+  }
+
+  @Test
+  public void testStringLastCombiningAggregator()
+  {
+    StringFirstAggregator agg = (StringFirstAggregator) 
combiningAggFactory.factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get();
+    Pair<Long, String> expected = pairs[3];
+
+    Assert.assertEquals(expected.lhs, result.lhs);
+    Assert.assertEquals(expected.rhs, result.rhs);
+  }
+
+  @Test
+  public void testStringFirstCombiningBufferAggregator()
+  {
+    StringFirstBufferAggregator agg = (StringFirstBufferAggregator) 
combiningAggFactory.factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new 
byte[stringLastAggFactory.getMaxIntermediateSize()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+    Pair<Long, String> expected = pairs[3];
+
+    Assert.assertEquals(expected.lhs, result.lhs);
+    Assert.assertEquals(expected.rhs, result.rhs);
+  }
+
+  @Test
+  public void testStringFirstAggregateCombiner()
+  {
+    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+    TestObjectColumnSelector columnSelector = new 
TestObjectColumnSelector<>(strings);
+
+    StringFirstAggregateCombiner stringFirstAggregateCombiner =
+        (StringFirstAggregateCombiner) 
combiningAggFactory.makeAggregateCombiner();
+
+    stringFirstAggregateCombiner.reset(columnSelector);
+
+    Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+
+    columnSelector.increment();
+    stringFirstAggregateCombiner.fold(columnSelector);
+
+    Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+
+    stringFirstAggregateCombiner.reset(columnSelector);
+
+    Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+  }
+
+  private void aggregate(
+      StringFirstAggregator agg
+  )
+  {
+    agg.aggregate();
+    timeSelector.increment();
+    valueSelector.increment();
+    objectSelector.increment();
+  }
+
+  private void aggregate(
+      StringFirstBufferAggregator agg,
+      ByteBuffer buff,
+      int position
+  )
+  {
+    agg.aggregate(buff, position);
+    timeSelector.increment();
+    valueSelector.increment();
+    objectSelector.increment();
+  }
+}
diff --git 
a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
 
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
new file mode 100644
index 0000000..8a4a0de
--- /dev/null
+++ 
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class StringFirstBufferAggregatorTest
+{
+  private void aggregateBuffer(
+      TestLongColumnSelector timeSelector,
+      TestObjectColumnSelector valueSelector,
+      BufferAggregator agg,
+      ByteBuffer buf,
+      int position
+  )
+  {
+    agg.aggregate(buf, position);
+    timeSelector.increment();
+    valueSelector.increment();
+  }
+
+  @Test
+  public void testBufferAggregate() throws Exception
+  {
+
+    final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 
1526725900L, 1526725000L};
+    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new 
TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<String> objectColumnSelector = new 
TestObjectColumnSelector<>(strings);
+
+    StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, 
position);
+    }
+
+    SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, 
position));
+
+
+    Assert.assertEquals("expectec last string value", strings[0], sp.rhs);
+    Assert.assertEquals("last string timestamp is the biggest", new 
Long(timestamps[0]), new Long(sp.lhs));
+
+  }
+
+  @Test
+  public void testNullBufferAggregate() throws Exception
+  {
+
+    final long[] timestamps = {2222L, 1111L, 3333L, 4444L, 5555L};
+    final String[] strings = {null, "AAAA", "BBBB", "DDDD", "EEEE"};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new 
TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<String> objectColumnSelector = new 
TestObjectColumnSelector<>(strings);
+
+    StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, 
position);
+    }
+
+    SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, 
position));
+
+
+    Assert.assertEquals("expectec last string value", strings[1], sp.rhs);
+    Assert.assertEquals("last string timestamp is the biggest", new 
Long(timestamps[1]), new Long(sp.lhs));
+
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testNoStringValue()
+  {
+
+    final long[] timestamps = {1526724000L, 1526724600L};
+    final Double[] doubles = {null, 2.00};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new 
TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<Double> objectColumnSelector = new 
TestObjectColumnSelector<>(doubles);
+
+    StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, 
position);
+    }
+  }
+}
diff --git 
a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
 
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
new file mode 100644
index 0000000..bac9a6d
--- /dev/null
+++ 
b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.granularity.Granularities;
+import io.druid.query.Druids;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.Result;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryEngine;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.segment.TestHelper;
+import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.IncrementalIndexSchema;
+import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class StringFirstTimeseriesQueryTest
+{
+
+  @Test
+  public void testTopNWithDistinctCountAgg() throws Exception
+  {
+    TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
+
+    String visitor_id = "visitor_id";
+    String client_type = "client_type";
+
+    IncrementalIndex index = new IncrementalIndex.Builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema.Builder()
+                .withQueryGranularity(Granularities.SECOND)
+                .withMetrics(new CountAggregatorFactory("cnt"))
+                .withMetrics(new StringFirstAggregatorFactory(
+                    "last_client_type", "client_type", 1024)
+                )
+                .build()
+        )
+        .setMaxRowCount(1000)
+        .buildOnheap();
+
+
+    DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z");
+    long timestamp = time.getMillis();
+
+    DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z");
+    long timestamp1 = time1.getMillis();
+    index.add(
+        new MapBasedInputRow(
+            timestamp,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "0", client_type, 
"iphone")
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            timestamp,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "1", client_type, 
"iphone")
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            timestamp1,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "0", client_type, 
"android")
+        )
+    );
+
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource(QueryRunnerTestHelper.dataSource)
+                                  .granularity(QueryRunnerTestHelper.allGran)
+                                  
.intervals(QueryRunnerTestHelper.fullOnInterval)
+                                  .aggregators(
+                                      Lists.newArrayList(
+                                          new StringFirstAggregatorFactory(
+                                              "last_client_type", client_type, 
1024
+                                          )
+                                      )
+                                  )
+                                  .build();
+
+    final Iterable<Result<TimeseriesResultValue>> results =
+        engine.process(query, new 
IncrementalIndexStorageAdapter(index)).toList();
+
+    List<Result<TimeseriesResultValue>> expectedResults = 
Collections.singletonList(
+        new Result<>(
+            time,
+            new TimeseriesResultValue(
+                ImmutableMap.<String, Object>of("last_client_type", new 
SerializablePairLongString(timestamp, "iphone"))
+            )
+        )
+    );
+    TestHelper.assertExpectedResults(expectedResults, results);
+  }
+}
diff --git 
a/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java
 
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java
new file mode 100644
index 0000000..1f2ecc4
--- /dev/null
+++ 
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.java.util.common.Pair;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.column.Column;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StringLastAggregationTest
+{
+  private final Integer MAX_STRING_SIZE = 1024;
+  private StringLastAggregatorFactory stringLastAggFactory;
+  private StringLastAggregatorFactory combiningAggFactory;
+  private ColumnSelectorFactory colSelectorFactory;
+  private TestLongColumnSelector timeSelector;
+  private TestObjectColumnSelector<String> valueSelector;
+  private TestObjectColumnSelector objectSelector;
+
+  private String[] strings = {"1111", "2222", "3333", null, "4444"};
+  private long[] times = {8224, 6879, 2436, 3546, 7888};
+  private SerializablePairLongString[] pairs = {
+      new SerializablePairLongString(52782L, "AAAA"),
+      new SerializablePairLongString(65492L, "BBBB"),
+      new SerializablePairLongString(69134L, "CCCC"),
+      new SerializablePairLongString(11111L, "DDDD"),
+      new SerializablePairLongString(51223L, null)
+  };
+
+  @Before
+  public void setup()
+  {
+    stringLastAggFactory = new StringLastAggregatorFactory("billy", "nilly", 
MAX_STRING_SIZE);
+    combiningAggFactory = (StringLastAggregatorFactory) 
stringLastAggFactory.getCombiningFactory();
+    timeSelector = new TestLongColumnSelector(times);
+    valueSelector = new TestObjectColumnSelector<>(strings);
+    objectSelector = new TestObjectColumnSelector<>(pairs);
+    colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
+    
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
+    
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+    EasyMock.replay(colSelectorFactory);
+  }
+
+  @Test
+  public void testStringLastAggregator()
+  {
+    StringLastAggregator agg = (StringLastAggregator) 
stringLastAggFactory.factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get();
+
+    Assert.assertEquals(strings[0], result.rhs);
+  }
+
+  @Test
+  public void testStringLastBufferAggregator()
+  {
+    StringLastBufferAggregator agg = (StringLastBufferAggregator) 
stringLastAggFactory.factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new 
byte[stringLastAggFactory.getMaxIntermediateSize()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+
+    Assert.assertEquals(strings[0], result.rhs);
+  }
+
+  @Test
+  public void testCombine()
+  {
+    SerializablePairLongString pair1 = new 
SerializablePairLongString(1467225000L, "AAAA");
+    SerializablePairLongString pair2 = new 
SerializablePairLongString(1467240000L, "BBBB");
+    Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2));
+  }
+
+  @Test
+  public void testStringLastCombiningAggregator()
+  {
+    StringLastAggregator agg = (StringLastAggregator) 
combiningAggFactory.factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get();
+    Pair<Long, String> expected = (Pair<Long, String>) pairs[2];
+
+    Assert.assertEquals(expected.lhs, result.lhs);
+    Assert.assertEquals(expected.rhs, result.rhs);
+  }
+
+  @Test
+  public void testStringLastCombiningBufferAggregator()
+  {
+    StringLastBufferAggregator agg = (StringLastBufferAggregator) 
combiningAggFactory.factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new 
byte[stringLastAggFactory.getMaxIntermediateSize()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+    Pair<Long, String> expected = (Pair<Long, String>) pairs[2];
+
+    Assert.assertEquals(expected.lhs, result.lhs);
+    Assert.assertEquals(expected.rhs, result.rhs);
+  }
+
+  @Test
+  public void testStringLastAggregateCombiner()
+  {
+    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+    TestObjectColumnSelector columnSelector = new 
TestObjectColumnSelector<>(strings);
+
+    StringLastAggregateCombiner stringFirstAggregateCombiner =
+        (StringLastAggregateCombiner) 
combiningAggFactory.makeAggregateCombiner();
+
+    stringFirstAggregateCombiner.reset(columnSelector);
+
+    Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+
+    columnSelector.increment();
+    stringFirstAggregateCombiner.fold(columnSelector);
+
+    Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+
+    stringFirstAggregateCombiner.reset(columnSelector);
+
+    Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+  }
+
+  private void aggregate(
+      StringLastAggregator agg
+  )
+  {
+    agg.aggregate();
+    timeSelector.increment();
+    valueSelector.increment();
+    objectSelector.increment();
+  }
+
+  private void aggregate(
+      StringLastBufferAggregator agg,
+      ByteBuffer buff,
+      int position
+  )
+  {
+    agg.aggregate(buff, position);
+    timeSelector.increment();
+    valueSelector.increment();
+    objectSelector.increment();
+  }
+}
diff --git 
a/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
 
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
new file mode 100644
index 0000000..c7c125b
--- /dev/null
+++ 
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class StringLastBufferAggregatorTest
+{
+  private void aggregateBuffer(
+      TestLongColumnSelector timeSelector,
+      TestObjectColumnSelector valueSelector,
+      BufferAggregator agg,
+      ByteBuffer buf,
+      int position
+  )
+  {
+    agg.aggregate(buf, position);
+    timeSelector.increment();
+    valueSelector.increment();
+  }
+
+  @Test
+  public void testBufferAggregate() throws Exception
+  {
+
+    final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 
1526725900L, 1526725000L};
+    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new 
TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<String> objectColumnSelector = new 
TestObjectColumnSelector<>(strings);
+
+    StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringLastBufferAggregator agg = new StringLastBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, 
position);
+    }
+
+    SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, 
position));
+
+
+    Assert.assertEquals("expectec last string value", "DDDD", sp.rhs);
+    Assert.assertEquals("last string timestamp is the biggest", new 
Long(1526725900L), new Long(sp.lhs));
+
+  }
+
+  @Test
+  public void testNullBufferAggregate() throws Exception
+  {
+
+    final long[] timestamps = {1111L, 2222L, 6666L, 4444L, 5555L};
+    final String[] strings = {"CCCC", "AAAA", "BBBB", null, "EEEE"};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new 
TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<String> objectColumnSelector = new 
TestObjectColumnSelector<>(strings);
+
+    StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringLastBufferAggregator agg = new StringLastBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, 
position);
+    }
+
+    SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, 
position));
+
+
+    Assert.assertEquals("expectec last string value", strings[2], sp.rhs);
+    Assert.assertEquals("last string timestamp is the biggest", new 
Long(timestamps[2]), new Long(sp.lhs));
+
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testNoStringValue()
+  {
+
+    final long[] timestamps = {1526724000L, 1526724600L};
+    final Double[] doubles = {null, 2.00};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new 
TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<Double> objectColumnSelector = new 
TestObjectColumnSelector<>(doubles);
+
+    StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringLastBufferAggregator agg = new StringLastBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, 
position);
+    }
+  }
+}
diff --git 
a/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
 
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
new file mode 100644
index 0000000..a68798e
--- /dev/null
+++ 
b/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.granularity.Granularities;
+import io.druid.query.Druids;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.Result;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryEngine;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.segment.TestHelper;
+import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.IncrementalIndexSchema;
+import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class StringLastTimeseriesQueryTest
+{
+
+  @Test
+  public void testTopNWithDistinctCountAgg() throws Exception
+  {
+    TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
+
+    String visitor_id = "visitor_id";
+    String client_type = "client_type";
+
+    IncrementalIndex index = new IncrementalIndex.Builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema.Builder()
+                .withQueryGranularity(Granularities.SECOND)
+                .withMetrics(new CountAggregatorFactory("cnt"))
+                .withMetrics(new StringLastAggregatorFactory(
+                    "last_client_type", "client_type", 1024)
+                )
+                .build()
+        )
+        .setMaxRowCount(1000)
+        .buildOnheap();
+
+
+    DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z");
+    long timestamp = time.getMillis();
+
+    DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z");
+    long timestamp1 = time1.getMillis();
+    index.add(
+        new MapBasedInputRow(
+            timestamp,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "0", client_type, 
"iphone")
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            timestamp,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "1", client_type, 
"iphone")
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            timestamp1,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "0", client_type, 
"android")
+        )
+    );
+
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource(QueryRunnerTestHelper.dataSource)
+                                  .granularity(QueryRunnerTestHelper.allGran)
+                                  
.intervals(QueryRunnerTestHelper.fullOnInterval)
+                                  .aggregators(
+                                      Lists.newArrayList(
+                                          new StringLastAggregatorFactory(
+                                              "last_client_type", client_type, 
1024
+                                          )
+                                      )
+                                  )
+                                  .build();
+
+    final Iterable<Result<TimeseriesResultValue>> results =
+        engine.process(query, new 
IncrementalIndexStorageAdapter(index)).toList();
+
+    List<Result<TimeseriesResultValue>> expectedResults = 
Collections.singletonList(
+        new Result<>(
+            time,
+            new TimeseriesResultValue(
+                ImmutableMap.<String, Object>of(
+                    "last_client_type",
+                    new SerializablePairLongString(timestamp1, "android")
+                )
+            )
+        )
+    );
+    TestHelper.assertExpectedResults(expectedResults, results);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to