pgaref commented on a change in pull request #1736:
URL: https://github.com/apache/hive/pull/1736#discussion_r544256032
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -1790,6 +1790,10 @@ private static void
populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
HIVEALIAS("hive.alias", "", ""),
HIVEMAPSIDEAGGREGATE("hive.map.aggr", true, "Whether to use map-side
aggregation in Hive Group By queries"),
HIVEGROUPBYSKEW("hive.groupby.skewindata", false, "Whether there is skew
in data to optimize group by queries"),
+
+ HIVE_ENABLE_COMBINER_FOR_GROUP_BY("hive.enable.combiner.for.groupby", true,
+ "Whether to enable tez combiner to aggregate the records after sorting
is done"),
Review comment:
Maybe clarify it is only used for map side aggregation? Any case this
would not be beneficial?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the
partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ Deserializer valueDeserializer;
+ GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+ GroupByOperator groupByOperator;
+ Serializer valueSerializer;
+ ObjectInspector aggrObjectInspector;
+ DataInputBuffer valueBuffer;
+ Object[] cachedValues;
+
+ public GroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+ if (rw != null) {
+ try {
+ groupByOperator = (GroupByOperator) rw.getReducer();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector);
+ ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+ rowObjectInspector[0] =
+
ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+ ois);
+ groupByOperator.setInputObjInspectors(rowObjectInspector);
+ groupByOperator.initializeOp(conf);
+ aggregationBuffers = groupByOperator.getAggregationBuffers();
+ aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+ TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+ if ((aggregationEvaluators == null) || (aggregationEvaluators.length
!= numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does"
+
+ " not match with number of aggregators");
+ numValueCol = 0;
+ rw = null;
+ return;
+ }
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ valueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getDeserializerClass(), null);
+ SerDeUtils.initializeSerDe(valueDeserializer, null,
+ valueTableDesc.getProperties(), null);
+
+ aggrObjectInspector = groupByOperator.getAggrObjInspector();
+ valueBuffer = new DataInputBuffer();
+ cachedValues = new Object[aggregationEvaluators.length];
+ } catch (Exception e) {
+ LOG.error(" GroupByCombiner failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ }
+
+ private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+ throws Exception {
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ cachedValues[i] =
aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+ }
+ BytesWritable result = (BytesWritable)
valueSerializer.serialize(cachedValues,
+ aggrObjectInspector);
+ valueBuffer.reset(result.getBytes(), result.getLength());
+ writer.append(key, valueBuffer);
+ combineOutputRecordsCounter.increment(1);
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ aggregationEvaluators[i].reset(aggregationBuffers[i]);
+ }
+ }
+
+ private void updateAggregation(BytesWritable valWritable, DataInputBuffer
value)
+ throws HiveException, SerDeException {
+ valWritable.set(value.getData(), value.getPosition(),
+ value.getLength() - value.getPosition());
+ Object row = valueDeserializer.deserialize(valWritable);
+ groupByOperator.updateAggregation(row);
+ }
+
+ private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer
writer) {
+ long numRows = 0;
+ try {
+ DataInputBuffer key = rawIter.getKey();
+ DataInputBuffer prevKey = new DataInputBuffer();
+ prevKey.reset(key.getData(), key.getPosition(), key.getLength() -
key.getPosition());
+ BytesWritable valWritable = new BytesWritable();
+ DataInputBuffer prevVal = new DataInputBuffer();
+ prevVal.reset(rawIter.getValue().getData(),
rawIter.getValue().getPosition(),
+ rawIter.getValue().getLength() -
rawIter.getValue().getPosition());
+ int numSameKey = 0;
+ while (rawIter.next()) {
+ key = rawIter.getKey();
+ if (!VectorGroupByCombiner.compare(key, prevKey)) {
+ // if current key is not equal to the previous key then we have to
emit the
+ // record. In case only one record was present for this key, then no
need to
+ // do aggregation, We can directly append the key and value. For key
with more
+ // than one record, we have to update the aggregation for the
current value only
+ // as for previous values (records) aggregation is already done in
previous
+ // iteration of loop.
+ if (numSameKey != 0) {
+ updateAggregation(valWritable, prevVal);
+ processAggregation(writer, prevKey);
+ } else {
+ writer.append(prevKey, prevVal);
+ }
+ prevKey.reset(key.getData(), key.getPosition(),
+ key.getLength() - key.getPosition());
+ numSameKey = 0;
+ } else {
+ // If there are more than one record with same key then update the
aggregation.
+ updateAggregation(valWritable, prevVal);
+ numSameKey++;
+ }
+ prevVal.reset(rawIter.getValue().getData(),
rawIter.getValue().getPosition(),
+ rawIter.getValue().getLength() -
rawIter.getValue().getPosition());
+ numRows++;
+ }
+ if (numSameKey != 0) {
+ updateAggregation(valWritable, prevVal);
+ processAggregation(writer, prevKey);
+ } else {
+ writer.append(prevKey, prevVal);
+ }
+ combineInputRecordsCounter.increment(numRows);
+ } catch(Exception e) {
+ LOG.error("processRows failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void combine(TezRawKeyValueIterator rawIter, IFile.Writer writer) {
+ try {
+ if (!rawIter.next()) {
+ return;
+ }
+ if (numValueCol == 0) {
+ // For no aggregation, RLE in writer will take care of reduction.
+ appendDirectlyToWriter(rawIter, writer);
+ } else {
+ processRows(rawIter, writer);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to combine rows", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ public static JobConf setCombinerInConf(BaseWork dest, JobConf conf, JobConf
destConf) {
+ //TODO need to change it to a proper config
+ if (conf == null ||
!conf.get(HiveConf.ConfVars.HIVE_ENABLE_COMBINER_FOR_GROUP_BY.varname).
Review comment:
HiveConf.getBoolVar ?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the
partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ Deserializer valueDeserializer;
+ GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+ GroupByOperator groupByOperator;
+ Serializer valueSerializer;
+ ObjectInspector aggrObjectInspector;
+ DataInputBuffer valueBuffer;
+ Object[] cachedValues;
+
+ public GroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+ if (rw != null) {
+ try {
+ groupByOperator = (GroupByOperator) rw.getReducer();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector);
+ ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+ rowObjectInspector[0] =
+
ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+ ois);
+ groupByOperator.setInputObjInspectors(rowObjectInspector);
+ groupByOperator.initializeOp(conf);
+ aggregationBuffers = groupByOperator.getAggregationBuffers();
+ aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+ TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+ if ((aggregationEvaluators == null) || (aggregationEvaluators.length
!= numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
+ // those aggregates which are not part of distinct.
+ LOG.info(" Combiner is disabled as the number of value columns does"
+
+ " not match with number of aggregators");
+ numValueCol = 0;
+ rw = null;
+ return;
+ }
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
+ valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+ valueDeserializer = (AbstractSerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getDeserializerClass(), null);
+ SerDeUtils.initializeSerDe(valueDeserializer, null,
+ valueTableDesc.getProperties(), null);
+
+ aggrObjectInspector = groupByOperator.getAggrObjInspector();
+ valueBuffer = new DataInputBuffer();
+ cachedValues = new Object[aggregationEvaluators.length];
+ } catch (Exception e) {
+ LOG.error(" GroupByCombiner failed", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ }
+
+ private void processAggregation(IFile.Writer writer, DataInputBuffer key)
+ throws Exception {
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ cachedValues[i] =
aggregationEvaluators[i].evaluate(aggregationBuffers[i]);
+ }
+ BytesWritable result = (BytesWritable)
valueSerializer.serialize(cachedValues,
+ aggrObjectInspector);
+ valueBuffer.reset(result.getBytes(), result.getLength());
+ writer.append(key, valueBuffer);
+ combineOutputRecordsCounter.increment(1);
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ aggregationEvaluators[i].reset(aggregationBuffers[i]);
+ }
+ }
+
+ private void updateAggregation(BytesWritable valWritable, DataInputBuffer
value)
+ throws HiveException, SerDeException {
+ valWritable.set(value.getData(), value.getPosition(),
+ value.getLength() - value.getPosition());
+ Object row = valueDeserializer.deserialize(valWritable);
+ groupByOperator.updateAggregation(row);
+ }
+
+ private void processRows(TezRawKeyValueIterator rawIter, IFile.Writer
writer) {
+ long numRows = 0;
+ try {
+ DataInputBuffer key = rawIter.getKey();
+ DataInputBuffer prevKey = new DataInputBuffer();
Review comment:
shall we move prevKey, valWritable, prevVal to class fields to avoid
multiple instantiations?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -515,6 +516,10 @@ DAG build(JobConf conf, TezWork tezWork, Path scratchDir,
Context ctx,
Edge e = null;
TezEdgeProperty edgeProp = tezWork.getEdgeProperty(workUnit, v);
+
+ //Add the reducer plan to config to create the combiner object in
case of group by.
+ wxConf = GroupByCombiner.setCombinerInConf(v, wxConf,
workToConf.get(v));
Review comment:
Looks like all the combiner creation logic should be a level higher --
would it make sense to have this as part of createVertex method for example?
This looks hacky as is
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByCombiner.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByCombiner;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.api.TaskContext;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_REDUCE_WORK;
+import static org.apache.hadoop.hive.ql.exec.Utilities.REDUCE_PLAN_NAME;
+
+// Combiner for normal group by operator. In case of map side aggregate, the
partially
+// aggregated records are sorted based on group by key. If because of some
reasons, like hash
+// table memory exceeded the limit or the first few batches of records have
less ndvs, the
+// aggregation is not done, then here the aggregation can be done cheaply as
the records
+// are sorted based on group by key.
+public class GroupByCombiner extends VectorGroupByCombiner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.ql.exec.GroupByCombiner.class.getName());
+
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ Deserializer valueDeserializer;
+ GenericUDAFEvaluator.AggregationBuffer[] aggregationBuffers;
+ GroupByOperator groupByOperator;
+ Serializer valueSerializer;
+ ObjectInspector aggrObjectInspector;
+ DataInputBuffer valueBuffer;
+ Object[] cachedValues;
+
+ public GroupByCombiner(TaskContext taskContext) throws HiveException,
IOException {
+ super(taskContext);
+ if (rw != null) {
+ try {
+ groupByOperator = (GroupByOperator) rw.getReducer();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector);
+ ObjectInspector[] rowObjectInspector = new ObjectInspector[1];
+ rowObjectInspector[0] =
+
ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+ ois);
+ groupByOperator.setInputObjInspectors(rowObjectInspector);
+ groupByOperator.initializeOp(conf);
+ aggregationBuffers = groupByOperator.getAggregationBuffers();
+ aggregationEvaluators = groupByOperator.getAggregationEvaluator();
+
+ TableDesc valueTableDesc = rw.getTagToValueDesc().get(0);
+ if ((aggregationEvaluators == null) || (aggregationEvaluators.length
!= numValueCol)) {
+ //TODO : Need to support distinct. The logic has to be changed to
extract only
Review comment:
shall we open a follow JIRA up for this? what is the challenge here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]