Github user srinathshankar commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r88958613 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,127 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * + * @param ndv number of distinct values + * @param min minimum value + * @param max maximum value + * @param numNulls number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +// TODO: decide if we want to use bigint to represent ndv and numNulls. +case class ColumnStat( + ndv: Long, + min: Any, + max: Any, + numNulls: Long, + avgLen: Long, + maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { - NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + */ + def toMap: Map[String, String] = Map( + "version" -> "1", + "ndv" -> ndv.toString, + "min" -> min.toString, + "max" -> max.toString, + "numNulls" -> numNulls.toString, + "avgLen" -> avgLen.toString, + "maxLen" -> maxLen.toString + ) +} + + +object ColumnStat extends Logging { - override def toString: String = { - // use Base64 for encoding - Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) + /** Returns true iff the we support gathering column statistics on column of the given type. */ + def supportsType(dataType: DataType): Boolean = dataType match { + case _: NumericType | TimestampType | DateType | BooleanType => true + case StringType | BinaryType => true + case _ => false } -} -object ColumnStat { - def apply(numFields: Int, str: String): ColumnStat = { - // use Base64 for decoding - val bytes = Base64.decodeBase64(str) - val unsafeRow = new UnsafeRow(numFields) - unsafeRow.pointTo(bytes, bytes.length) - ColumnStat(unsafeRow) + /** + * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats + * from some external storage. The serialization side is defined in [[ColumnStat.toMap]]. + */ + def fromMap(dataType: DataType, map: Map[String, String]): Option[ColumnStat] = { + val str2val: (String => Any) = dataType match { + case _: IntegralType => _.toLong + case _: DecimalType => Decimal(_) + case DoubleType | FloatType => _.toDouble + case BooleanType => _.toBoolean + case _ => identity + } + + try { + Some(ColumnStat( + ndv = map("ndv").toLong, + min = str2val(map.get("min").orNull), + max = str2val(map.get("max").orNull), + numNulls = map("numNulls").toLong, + avgLen = map.getOrElse("avgLen", "1").toLong, + maxLen = map.getOrElse("maxLen", "1").toLong + )) + } catch { + case NonFatal(e) => + logWarning("Failed to parse column statistics", e) + None + } } -} -case class NumericColumnStat[T <: AtomicType](statRow: InternalRow, dataType: T) { - // The indices here must be consistent with `ColumnStatStruct.numericColumnStat`. - val numNulls: Long = statRow.getLong(0) - val max: T#InternalType = statRow.get(1, dataType).asInstanceOf[T#InternalType] - val min: T#InternalType = statRow.get(2, dataType).asInstanceOf[T#InternalType] - val ndv: Long = statRow.getLong(3) -} + /** + * ndv: Long, min: T, max: T, numNulls: Long, avgLen: Long, maxLen: Long + */ + def statExprs(col: Attribute, relativeSD: Double): CreateNamedStruct = { + def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr => + expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() } + }) + val zero = Literal(0, LongType) + val one = Literal(1, LongType) -case class StringColumnStat(statRow: InternalRow) { - // The indices here must be consistent with `ColumnStatStruct.stringColumnStat`. - val numNulls: Long = statRow.getLong(0) - val avgColLen: Double = statRow.getDouble(1) - val maxColLen: Long = statRow.getInt(2) - val ndv: Long = statRow.getLong(3) -} + // the approximate ndv should never be larger than the number of rows + val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), Count(one))) --- End diff -- I'm assuming `HyperLogLogPlusPlus` ignores like other aggregates. In that case shouldn't you use `Count(col)` instead of `Count(one)` as an upper bound ?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org