Ali Alsuliman has submitted this change and it was merged.

Change subject: [ASTERIXDB-2516][RT] Modifty range map function of parallel sort
......................................................................


[ASTERIXDB-2516][RT] Modifty range map function of parallel sort

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Currently, the range map function of parallel sort passes "null"
to get a comparator to sort the samples and produce the range map.
The comparator provider will enforce providing a type and prevent
passing "null". The range map function needs to be updated to get
the types of the sort fields and use them to get comparators.
- changed the output type of the local sampling function from
list of list of ANY to binary. The old type computer was removed.
- added null writer aggregate function that just produces null as
an aggregate value. this is needed in order to propagate the type
of the sort fields from the local step to the global step so that
the range map function can know the types of the sort fields.

Change-Id: I7edbb10906cc4464210af87a5b1630ba3aecbde0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3333
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lycha...@couchbase.com>
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
D 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java
M 
asterixdb/asterix-om/src/test/java/org/apache/asterix/om/typecomputer/TypeComputerTest.java
A 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/collections/NullWriterAggregateDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IntSerDeUtils.java
14 files changed, 294 insertions(+), 237 deletions(-)

Approvals:
  Jenkins: Verified; ; Verified
  Dmitry Lychagin: Looks good to me, approved

Objections:
  Anon. E. Moose #1000171: Violations found
  Jenkins: Violations found



diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index f42f0d5..a7d40b4 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -363,8 +363,8 @@
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new 
AddEquivalenceClassForRecordConstructorRule());
         physicalRewritesAllLevels.add(new CheckFullParallelSortRule());
-        physicalRewritesAllLevels
-                .add(new 
EnforceStructuralPropertiesRule(BuiltinFunctions.RANGE_MAP, 
BuiltinFunctions.LOCAL_SAMPLING));
+        physicalRewritesAllLevels.add(new 
EnforceStructuralPropertiesRule(BuiltinFunctions.RANGE_MAP,
+                BuiltinFunctions.LOCAL_SAMPLING, 
BuiltinFunctions.NULL_WRITER));
         physicalRewritesAllLevels.add(new RemoveSortInFeedIngestionRule());
         physicalRewritesAllLevels.add(new 
RemoveUnnecessarySortMergeExchange());
         physicalRewritesAllLevels.add(new PushProjectDownRule());
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
index c5ff5d1..99412dd 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
@@ -742,7 +742,7 @@
                 
unnestMapOp.setSourceLocation(dataSourceOp.getSourceLocation());
                 indexSearchOp = unnestMapOp;
             }
-
+            // TODO: shouldn't indexSearchOp execution mode be set to that of 
the input? the default is UNPARTITIONED
             indexSearchOp.getInputs().add(new MutableObject<>(inputOp));
 
             // Adds equivalence classes --- one equivalent class between a 
primary key
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 382b1b2..e2ba1e1 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -18,7 +18,11 @@
  */
 package org.apache.asterix.om.functions;
 
-import static 
org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.*;
+import static 
org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.HAS_LIST_ARG;
+import static 
org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.INJECT_ORDER_ARGS;
+import static 
org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION;
+import static 
org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.NO_FRAME_CLAUSE;
+import static 
org.apache.asterix.om.functions.BuiltinFunctions.WindowFunctionProperty.NO_ORDER_CLAUSE;
 
 import java.util.Collections;
 import java.util.EnumSet;
@@ -84,7 +88,6 @@
 import org.apache.asterix.om.typecomputer.impl.IfNanOrInfTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.IfNullTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
-import org.apache.asterix.om.typecomputer.impl.ListOfSamplesTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.LocalAvgTypeComputer;
 import 
org.apache.asterix.om.typecomputer.impl.LocalSingleVarStatisticsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.MinMaxAggTypeComputer;
@@ -507,7 +510,7 @@
     public static final FunctionIdentifier LOCAL_SAMPLING =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"agg-local-sampling", FunctionIdentifier.VARARGS);
     public static final FunctionIdentifier RANGE_MAP =
-            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"agg-range-map", 1);
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"agg-range-map", FunctionIdentifier.VARARGS);
     public static final FunctionIdentifier STDDEV_POP =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"agg-stddev_pop", 1);
     public static final FunctionIdentifier GLOBAL_STDDEV_POP =
@@ -548,6 +551,8 @@
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"agg-intermediate-kurtosis", 1);
     public static final FunctionIdentifier LOCAL_KURTOSIS =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"agg-local-kurtosis", 1);
+    public static final FunctionIdentifier NULL_WRITER =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"agg-null-writer", 1);
 
     public static final FunctionIdentifier SCALAR_AVG = new 
FunctionIdentifier(FunctionConstants.ASTERIX_NS, "avg", 1);
     public static final FunctionIdentifier SCALAR_COUNT =
@@ -1759,7 +1764,7 @@
         addPrivateFunction(LOCAL_STDDEV_SAMP, 
LocalSingleVarStatisticsTypeComputer.INSTANCE, true);
         addFunction(STDDEV_SAMP, NullableDoubleTypeComputer.INSTANCE, true);
         addPrivateFunction(GLOBAL_STDDEV_SAMP, 
NullableDoubleTypeComputer.INSTANCE, true);
-        addPrivateFunction(LOCAL_SAMPLING, ListOfSamplesTypeComputer.INSTANCE, 
true);
+        addPrivateFunction(LOCAL_SAMPLING, ABinaryTypeComputer.INSTANCE, true);
         addPrivateFunction(RANGE_MAP, ABinaryTypeComputer.INSTANCE, true);
         addPrivateFunction(LOCAL_STDDEV_POP, 
LocalSingleVarStatisticsTypeComputer.INSTANCE, true);
         addFunction(STDDEV_POP, NullableDoubleTypeComputer.INSTANCE, true);
@@ -1776,6 +1781,7 @@
         addPrivateFunction(LOCAL_KURTOSIS, 
LocalSingleVarStatisticsTypeComputer.INSTANCE, true);
         addFunction(KURTOSIS, NullableDoubleTypeComputer.INSTANCE, true);
         addPrivateFunction(GLOBAL_KURTOSIS, 
NullableDoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(NULL_WRITER, 
PropagateTypeComputer.INSTANCE_NULLABLE, true);
 
         // SUM
         addFunction(SUM, NumericSumAggTypeComputer.INSTANCE, true);
@@ -2579,6 +2585,11 @@
         addIntermediateAgg(RANGE_MAP, RANGE_MAP);
         addGlobalAgg(RANGE_MAP, RANGE_MAP);
 
+        addAgg(NULL_WRITER);
+        addLocalAgg(NULL_WRITER, NULL_WRITER);
+        addIntermediateAgg(NULL_WRITER, NULL_WRITER);
+        addGlobalAgg(NULL_WRITER, NULL_WRITER);
+
         // MIN
 
         addAgg(MIN);
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java
deleted file mode 100644
index 1ae72e4..0000000
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.om.typecomputer.impl;
-
-import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
-import org.apache.asterix.om.types.AOrderedListType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-
-/**
- * List of samples type: [[ANY], [ANY],...]. Each inner list constitutes one 
sample. Inside the inner list (the sample),
- * each item (or field) has its type tag.
- */
-public class ListOfSamplesTypeComputer extends AbstractResultTypeComputer {
-
-    public static final AOrderedListType TYPE =
-            new 
AOrderedListType(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE, null);
-    public static final ListOfSamplesTypeComputer INSTANCE = new 
ListOfSamplesTypeComputer();
-
-    private ListOfSamplesTypeComputer() {
-    }
-
-    @Override
-    protected IAType getResultType(ILogicalExpression expr, IAType... 
strippedInputTypes) throws AlgebricksException {
-        return TYPE;
-    }
-}
diff --git 
a/asterixdb/asterix-om/src/test/java/org/apache/asterix/om/typecomputer/TypeComputerTest.java
 
b/asterixdb/asterix-om/src/test/java/org/apache/asterix/om/typecomputer/TypeComputerTest.java
index 4733c2f..bd77580 100644
--- 
a/asterixdb/asterix-om/src/test/java/org/apache/asterix/om/typecomputer/TypeComputerTest.java
+++ 
b/asterixdb/asterix-om/src/test/java/org/apache/asterix/om/typecomputer/TypeComputerTest.java
@@ -48,6 +48,7 @@
 import org.reflections.scanners.SubTypesScanner;
 
 // Tests if all type computers can handle input type ANY properly.
+// TODO: this test should be fixed/updated/modified/enhanced
 public class TypeComputerTest {
 
     @Test
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/collections/NullWriterAggregateDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/collections/NullWriterAggregateDescriptor.java
new file mode 100644
index 0000000..019465f
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/collections/NullWriterAggregateDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.collections;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import 
org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.aggregates.std.AbstractAggregateFunction;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class NullWriterAggregateDescriptor extends 
AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = 
NullWriterAggregateDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.NULL_WRITER;
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final 
IScalarEvaluatorFactory[] args) {
+        return new IAggregateEvaluatorFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IAggregateEvaluator 
createAggregateEvaluator(IHyracksTaskContext ctx) {
+                return new AbstractAggregateFunction(sourceLoc) {
+
+                    @Override
+                    public void init() {
+                        // do nothing
+                    }
+
+                    @Override
+                    public void step(IFrameTupleReference tuple) {
+                        // do nothing
+                    }
+
+                    @Override
+                    public void finish(IPointable result) {
+                        PointableHelper.setNull(result);
+                    }
+
+                    @Override
+                    public void finishPartial(IPointable result) {
+                        finish(result);
+                    }
+                };
+            }
+        };
+    }
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
index 55d381d..25256c1 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
@@ -20,16 +20,16 @@
 
 import java.io.IOException;
 
-import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ABinary;
+import org.apache.asterix.om.base.AMutableBinary;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.functions.IFunctionTypeInferer;
-import org.apache.asterix.om.typecomputer.impl.ListOfSamplesTypeComputer;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.BuiltinType;
 import 
org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import org.apache.asterix.runtime.functions.FunctionTypeInferers;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -38,11 +38,15 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class LocalSamplingAggregateDescriptor extends 
AbstractAggregateFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
@@ -78,80 +82,76 @@
             @Override
             public IAggregateEvaluator createAggregateEvaluator(final 
IHyracksTaskContext ctx)
                     throws HyracksDataException {
-                return new LocalSamplingAggregateFunction(args, ctx, 
numSamples);
+                return new LocalSamplingAggregateFunction(args, ctx, 
numSamples, sourceLoc);
             }
         };
     }
 
-    private class LocalSamplingAggregateFunction implements 
IAggregateEvaluator {
+    private static class LocalSamplingAggregateFunction extends 
AbstractAggregateFunction {
+        @SuppressWarnings("unchecked")
+        private ISerializerDeserializer<ABinary> binarySerde =
+                
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABINARY);
+        private final AMutableBinary binary = new AMutableBinary(null, 0, 0);
+        private final ArrayBackedValueStorage storage = new 
ArrayBackedValueStorage();
+        private final ArrayBackedValueStorage rangeMapBits = new 
ArrayBackedValueStorage();
+        private final IPointable inputFieldValue = new VoidPointable();
         private final int numSamplesRequired;
-        private final ArrayBackedValueStorage storage;
-        private final IAsterixListBuilder listOfSamplesBuilder;
-        private final IAsterixListBuilder oneSampleBuilder;
         private final IScalarEvaluator[] sampledFieldsEval;
-        private final IPointable inputFieldValue;
-        private int numSamplesTaken;
+        private int numSamples;
 
         /**
          * @param args the fields that constitute a sample, e.g., $$1, $$2
          * @param context Hyracks task
+         * @param numSamples number of samples to take
+         * @param srcLoc source location
          * @throws HyracksDataException
          */
         private LocalSamplingAggregateFunction(IScalarEvaluatorFactory[] args, 
IHyracksTaskContext context,
-                int numSamplesRequired) throws HyracksDataException {
-            storage = new ArrayBackedValueStorage();
-            inputFieldValue = new VoidPointable();
+                int numSamples, SourceLocation srcLoc) throws 
HyracksDataException {
+            super(srcLoc);
             sampledFieldsEval = new IScalarEvaluator[args.length];
             for (int i = 0; i < args.length; i++) {
                 sampledFieldsEval[i] = args[i].createScalarEvaluator(context);
             }
-            oneSampleBuilder = new OrderedListBuilder();
-            listOfSamplesBuilder = new OrderedListBuilder();
-            listOfSamplesBuilder.reset(ListOfSamplesTypeComputer.TYPE);
-            this.numSamplesRequired = numSamplesRequired > 0 ? 
numSamplesRequired
-                    : (int) 
CompilerProperties.Option.COMPILER_SORT_SAMPLES.defaultValue();
+            this.numSamplesRequired =
+                    numSamples > 0 ? numSamples : (int) 
CompilerProperties.Option.COMPILER_SORT_SAMPLES.defaultValue();
         }
 
         @Override
         public void init() throws HyracksDataException {
-            numSamplesTaken = 0;
-            listOfSamplesBuilder.reset(ListOfSamplesTypeComputer.TYPE);
+            numSamples = 0;
+            rangeMapBits.reset();
+            // write a dummy integer at the beginning to be filled later with 
the actual number of samples taken
+            IntegerSerializerDeserializer.write(0, 
rangeMapBits.getDataOutput());
         }
 
         /**
          * Receives data stream one tuple at a time from a data source and 
records samples.
          * @param tuple one sample
-         * @throws HyracksDataException
+         * @throws HyracksDataException IO exception
          */
         @Override
         public void step(IFrameTupleReference tuple) throws 
HyracksDataException {
-            if (numSamplesTaken >= numSamplesRequired) {
+            if (numSamples >= numSamplesRequired) {
                 return;
             }
-            // start over for a new sample
-            oneSampleBuilder.reset((AbstractCollectionType) 
ListOfSamplesTypeComputer.TYPE.getItemType());
-
-            for (IScalarEvaluator fieldEval : sampledFieldsEval) {
-                // add fields to make up one sample
-                fieldEval.evaluate(tuple, inputFieldValue);
-                oneSampleBuilder.addItem(inputFieldValue);
+            for (int i = 0; i < sampledFieldsEval.length; i++) {
+                sampledFieldsEval[i].evaluate(tuple, inputFieldValue);
+                
IntegerSerializerDeserializer.write(inputFieldValue.getLength(), 
rangeMapBits.getDataOutput());
+                rangeMapBits.append(inputFieldValue);
             }
-            // prepare the sample to add it to the list of samples
-            storage.reset();
-            oneSampleBuilder.write(storage.getDataOutput(), true);
-            listOfSamplesBuilder.addItem(storage);
-            numSamplesTaken++;
+            numSamples++;
         }
 
         /**
          * Sends the list of samples to the global range-map generator.
-         * @param result list of samples
-         * @throws HyracksDataException
+         * @param result will store the list of samples
+         * @throws HyracksDataException IO exception
          */
         @Override
         public void finish(IPointable result) throws HyracksDataException {
             storage.reset();
-            if (numSamplesTaken == 0) {
+            if (numSamples == 0) {
                 // empty partition? then send system null as an indication of 
empty partition.
                 try {
                     
storage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
@@ -160,7 +160,9 @@
                     throw HyracksDataException.create(e);
                 }
             } else {
-                listOfSamplesBuilder.write(storage.getDataOutput(), true);
+                IntegerPointable.setInteger(rangeMapBits.getByteArray(), 
rangeMapBits.getStartOffset(), numSamples);
+                binary.setValue(rangeMapBits.getByteArray(), 
rangeMapBits.getStartOffset(), rangeMapBits.getLength());
+                binarySerde.serialize(binary, storage.getDataOutput());
                 result.set(storage);
             }
         }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
index bf8cf49..b174d07 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
@@ -26,13 +26,17 @@
 import java.util.List;
 
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ABinary;
+import org.apache.asterix.om.base.AMutableBinary;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.functions.IFunctionTypeInferer;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
 import 
org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import org.apache.asterix.runtime.evaluators.common.ListAccessor;
 import org.apache.asterix.runtime.functions.FunctionTypeInferers;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
@@ -41,8 +45,12 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -74,9 +82,10 @@
  */
 public class RangeMapAggregateDescriptor extends 
AbstractAggregateFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
-    private boolean[] ascendingFlags;
-    private int numOfPartitions;
+    private boolean[] ascFlags;
+    private int numPartitions;
     private int numOrderFields;
+    private IAType[] argsTypes;
 
     public static final IFunctionDescriptorFactory FACTORY = new 
IFunctionDescriptorFactory() {
         @Override
@@ -100,13 +109,14 @@
      * determine how many split points to pick out of the samples. It also 
needs to know the ascending/descending of
      * each sort field so that it can sort the samples accordingly first and 
then pick the (number of partitions - 1)
      * split points out of the sorted samples.
-     * @param states states[0]: number of partitions, states[1]: ascending 
flags
+     * @param states states[0]: number of partitions, states[1]: ascending 
flags, states[2]: inputs types
      */
     @Override
     public void setImmutableStates(Object... states) {
-        numOfPartitions = (int) states[0];
-        ascendingFlags = (boolean[]) states[1];
-        numOrderFields = ascendingFlags.length;
+        numPartitions = (int) states[0];
+        ascFlags = (boolean[]) states[1];
+        numOrderFields = ascFlags.length;
+        argsTypes = (IAType[]) states[2];
     }
 
     @Override
@@ -117,40 +127,34 @@
             @Override
             public IAggregateEvaluator createAggregateEvaluator(final 
IHyracksTaskContext ctx)
                     throws HyracksDataException {
-                return new RangeMapFunction(args, ctx, ascendingFlags, 
numOfPartitions, numOrderFields);
+                return new RangeMapFunction(args, ctx, ascFlags, 
numPartitions, numOrderFields, sourceLoc, argsTypes);
             }
         };
     }
 
-    private class RangeMapFunction implements IAggregateEvaluator {
+    private static class RangeMapFunction extends AbstractAggregateFunction {
+        @SuppressWarnings("unchecked")
+        private ISerializerDeserializer<ABinary> binarySerde =
+                
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABINARY);
+        private final AMutableBinary binary = new AMutableBinary(null, 0, 0);
+        private final List<List<byte[]>> finalSamples = new ArrayList<>();
+        private final ArrayBackedValueStorage storage = new 
ArrayBackedValueStorage();
+        private final IPointable input = new VoidPointable();
+        private final ByteArrayPointable rangeMapPointable = new 
ByteArrayPointable();
         private final IScalarEvaluator localSamplesEval;
-        private final IPointable localSamples;
-        private final List<List<byte[]>> finalSamples;
         private final Comparator<List<byte[]>> comparator;
         private final int numOfPartitions;
         private final int numOrderByFields;
-        private final ListAccessor listOfSamples;
-        private final ListAccessor oneSample;
-        private final IPointable oneSamplePointable;
-        private final ArrayBackedValueStorage oneSampleStorage;
-        private final IPointable field;
-        private final ArrayBackedValueStorage storage;
 
         @SuppressWarnings("unchecked")
         private RangeMapFunction(IScalarEvaluatorFactory[] args, 
IHyracksTaskContext context, boolean[] ascending,
-                int numOfPartitions, int numOrderByFields) throws 
HyracksDataException {
-            localSamples = new VoidPointable();
-            localSamplesEval = args[0].createScalarEvaluator(context);
-            finalSamples = new ArrayList<>();
-            comparator = createComparator(ascending);
+                int numOfPartitions, int numOrderByFields, SourceLocation 
sourceLocation, IAType[] argsTypes)
+                throws HyracksDataException {
+            super(sourceLocation);
+            this.localSamplesEval = args[0].createScalarEvaluator(context);
+            this.comparator = createComparator(ascending, argsTypes);
             this.numOfPartitions = numOfPartitions;
             this.numOrderByFields = numOrderByFields;
-            listOfSamples = new ListAccessor();
-            oneSample = new ListAccessor();
-            oneSamplePointable = new VoidPointable();
-            oneSampleStorage = new ArrayBackedValueStorage();
-            field = new VoidPointable();
-            storage = new ArrayBackedValueStorage();
         }
 
         @Override
@@ -161,41 +165,29 @@
         /**
          * Receives the local samples and appends them to the final list of 
samples.
          * @param tuple the partition's samples
-         * @throws HyracksDataException
+         * @throws HyracksDataException IO Exception
          */
         @Override
         public void step(IFrameTupleReference tuple) throws 
HyracksDataException {
             // check if empty stream (system_null), i.e. partition is empty, 
so no samples
-            localSamplesEval.evaluate(tuple, localSamples);
-            byte tag = 
localSamples.getByteArray()[localSamples.getStartOffset()];
-            if (tag == ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG) {
+            localSamplesEval.evaluate(tuple, input);
+            if (input.getByteArray()[input.getStartOffset()] == 
ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG) {
                 return;
             }
-            // deserialize the samples received from the local partition
-            listOfSamples.reset(localSamples.getByteArray(), 
localSamples.getStartOffset());
-            int numberOfSamples = listOfSamples.size();
-
-            // "sample" & "addedSample" are lists to support multi-column 
instead of one value, i.e. <3,"dept">
-            List<byte[]> addedSample;
-            int numberOfFields;
-            // add the samples to the final samples
-            try {
-                for (int i = 0; i < numberOfSamples; i++) {
-                    oneSampleStorage.reset();
-                    listOfSamples.getOrWriteItem(i, oneSamplePointable, 
oneSampleStorage);
-                    oneSample.reset(oneSamplePointable.getByteArray(), 
oneSamplePointable.getStartOffset());
-                    numberOfFields = oneSample.size();
-                    addedSample = new ArrayList<>(numberOfFields);
-                    for (int j = 0; j < numberOfFields; j++) {
-                        storage.reset();
-                        oneSample.getOrWriteItem(j, field, storage);
-                        
addedSample.add(Arrays.copyOfRange(field.getByteArray(), field.getStartOffset(),
-                                field.getStartOffset() + field.getLength()));
-                    }
-                    finalSamples.add(addedSample);
+            rangeMapPointable.set(input.getByteArray(), input.getStartOffset() 
+ 1, input.getLength() - 1);
+            byte[] rangeMapBytes = rangeMapPointable.getByteArray();
+            int pointer = rangeMapPointable.getContentStartOffset();
+            int numSamples = IntegerPointable.getInteger(rangeMapBytes, 
pointer);
+            pointer += Integer.BYTES; // eat the 4 bytes of the integer 
(number of samples)
+            for (int i = 0; i < numSamples; i++) {
+                List<byte[]> oneSample = new ArrayList<>(numOrderByFields);
+                for (int j = 0; j < numOrderByFields; j++) {
+                    int valueLength = 
IntegerPointable.getInteger(rangeMapBytes, pointer);
+                    pointer += Integer.BYTES; // eat the 4 bytes of the 
integer and move to the value
+                    oneSample.add(Arrays.copyOfRange(rangeMapBytes, pointer, 
pointer + valueLength));
+                    pointer += valueLength; // eat the length of the value and 
move to the next pair length:value
                 }
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
+                finalSamples.add(oneSample);
             }
         }
 
@@ -203,7 +195,7 @@
          * Produces the range map out of the collected samples from each 
partition. The final list of samples is sorted
          * first. Then, we select the split points by dividing the samples 
evenly.
          * @param result contains the serialized range map.
-         * @throws HyracksDataException
+         * @throws HyracksDataException IO Exception
          */
         @Override
         public void finish(IPointable result) throws HyracksDataException {
@@ -247,8 +239,7 @@
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
             }
-
-            serializeRangemap(numOrderByFields, storage.getByteArray(), 
endOffsets, result);
+            serializeRangeMap(numOrderByFields, storage.getByteArray(), 
endOffsets, result);
         }
 
         @Override
@@ -258,18 +249,17 @@
 
         /**
          * Creates the comparator that sorts all the collected samples before 
picking split points.
-         * @param ascending ascending or descending flag for each sort column.
+         * @param asc ascending or descending flag for each sort column.
+         * @param types types of inputs to range map function produced by the 
local step and holding sort fields types
          * @return the described comparator
          */
-        private Comparator<List<byte[]>> createComparator(boolean[] ascending) 
{
-            // create the generic comparator for each sort field
-            IBinaryComparator[] fieldsComparators = new 
IBinaryComparator[ascending.length];
-            for (int i = 0; i < ascending.length; i++) {
-                // TODO(ali): this is temporary
+        private static Comparator<List<byte[]>> createComparator(boolean[] 
asc, IAType[] types) {
+            // create the generic comparator for each sort field, sort fields 
types start at index 1
+            IBinaryComparator[] fieldsComparators = new 
IBinaryComparator[asc.length];
+            for (int i = 0, fieldIdx = 1; fieldIdx < types.length; i++, 
fieldIdx++) {
                 fieldsComparators[i] = BinaryComparatorFactoryProvider.INSTANCE
-                        .getBinaryComparatorFactory(null, null, 
ascending[i]).createBinaryComparator();
+                        .getBinaryComparatorFactory(types[fieldIdx], 
types[fieldIdx], asc[i]).createBinaryComparator();
             }
-
             return (splitPoint1, splitPoint2) -> {
                 try {
                     // two split points must have the same num of fields
@@ -299,16 +289,18 @@
          * @param splitValues the serialized split values stored one after the 
other
          * @param endOffsets the end offsets of each split value
          * @param result where the range map object is serialized
-         * @throws HyracksDataException
+         * @throws HyracksDataException IO Exception
          */
-        private void serializeRangemap(int numberFields, byte[] splitValues, 
int[] endOffsets, IPointable result)
+        private void serializeRangeMap(int numberFields, byte[] splitValues, 
int[] endOffsets, IPointable result)
                 throws HyracksDataException {
             ArrayBackedValueStorage serRangeMap = new 
ArrayBackedValueStorage();
             IntegerSerializerDeserializer.write(numberFields, 
serRangeMap.getDataOutput());
             ByteArraySerializerDeserializer.write(splitValues, 
serRangeMap.getDataOutput());
             IntArraySerializerDeserializer.write(endOffsets, 
serRangeMap.getDataOutput());
-
-            result.set(serRangeMap.getByteArray(), 
serRangeMap.getStartOffset(), serRangeMap.getLength());
+            binary.setValue(serRangeMap.getByteArray(), 
serRangeMap.getStartOffset(), serRangeMap.getLength());
+            storage.reset();
+            binarySerde.serialize(binary, storage.getDataOutput());
+            result.set(storage);
         }
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 8c884ca..488ee76 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -32,6 +32,7 @@
 import 
org.apache.asterix.runtime.aggregates.collections.LastElementAggregateDescriptor;
 import 
org.apache.asterix.runtime.aggregates.collections.ListifyAggregateDescriptor;
 import 
org.apache.asterix.runtime.aggregates.collections.LocalFirstElementAggregateDescriptor;
+import 
org.apache.asterix.runtime.aggregates.collections.NullWriterAggregateDescriptor;
 import 
org.apache.asterix.runtime.aggregates.scalar.ScalarAvgAggregateDescriptor;
 import 
org.apache.asterix.runtime.aggregates.scalar.ScalarAvgDistinctAggregateDescriptor;
 import 
org.apache.asterix.runtime.aggregates.scalar.ScalarCountAggregateDescriptor;
@@ -623,6 +624,7 @@
         fc.add(GlobalSkewnessAggregateDescriptor.FACTORY);
         fc.add(EmptyStreamAggregateDescriptor.FACTORY);
         fc.add(NonEmptyStreamAggregateDescriptor.FACTORY);
+        fc.add(NullWriterAggregateDescriptor.FACTORY);
 
         // serializable aggregates
         fc.add(SerializableCountAggregateDescriptor.FACTORY);
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
index 44e3eb7..4be71e3 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java
@@ -85,23 +85,18 @@
         public void infer(ILogicalExpression expr, IFunctionDescriptor fd, 
IVariableTypeEnvironment context,
                 CompilerProperties compilerProps) throws AlgebricksException {
             AbstractFunctionCallExpression fce = 
(AbstractFunctionCallExpression) expr;
-            IAType[] argsTypes = new IAType[fce.getArguments().size()];
-            int i = 0;
-            for (Mutable<ILogicalExpression> arg : fce.getArguments()) {
-                argsTypes[i] = TypeComputeUtils.getActualType((IAType) 
context.getType(arg.getValue()));
-                i++;
-            }
-            fd.setImmutableStates((Object[]) argsTypes);
+            fd.setImmutableStates((Object[]) getArgumentsTypes(fce, context));
         }
     };
 
     public static final IFunctionTypeInferer SET_SORTING_PARAMETERS = new 
IFunctionTypeInferer() {
         @Override
-        public void infer(ILogicalExpression expr, IFunctionDescriptor fd, 
IVariableTypeEnvironment context,
+        public void infer(ILogicalExpression expr, IFunctionDescriptor fd, 
IVariableTypeEnvironment ctx,
                 CompilerProperties compilerProps) throws AlgebricksException {
-            AbstractFunctionCallExpression funCallExpr = 
(AbstractFunctionCallExpression) expr;
-            Object[] sortingParameters = funCallExpr.getOpaqueParameters();
-            fd.setImmutableStates(sortingParameters[0], sortingParameters[1]);
+            // sets the type of the input range map produced by the local 
sampling expression and types of sort fields
+            AbstractFunctionCallExpression funExp = 
(AbstractFunctionCallExpression) expr;
+            Object[] sortingParameters = funExp.getOpaqueParameters();
+            fd.setImmutableStates(sortingParameters[0], sortingParameters[1], 
getArgumentsTypes(funExp, ctx));
         }
     };
 
@@ -331,4 +326,15 @@
             fd.setImmutableStates((Object[]) argRecordTypes);
         }
     }
+
+    private static IAType[] getArgumentsTypes(AbstractFunctionCallExpression 
funExp, IVariableTypeEnvironment ctx)
+            throws AlgebricksException {
+        IAType[] argsTypes = new IAType[funExp.getArguments().size()];
+        int i = 0;
+        for (Mutable<ILogicalExpression> arg : funExp.getArguments()) {
+            argsTypes[i] = TypeComputeUtils.getActualType((IAType) 
ctx.getType(arg.getValue()));
+            i++;
+        }
+        return argsTypes;
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
index 1c7da34..aae4864 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class VariableReferenceExpression extends AbstractLogicalExpression {
     private int tupleRef;
@@ -44,6 +45,11 @@
         this(0, variable);
     }
 
+    public VariableReferenceExpression(LogicalVariable variable, 
SourceLocation sourceLoc) {
+        this(variable);
+        this.sourceLoc = sourceLoc;
+    }
+
     public LogicalVariable getVariableReference() {
         return variable;
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index bff7431..f66a6b8 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -114,10 +114,13 @@
     private PhysicalOptimizationConfig physicalOptimizationConfig;
     private final FunctionIdentifier rangeMapFunction;
     private final FunctionIdentifier localSamplingFun;
+    private final FunctionIdentifier typePropagatingFun;
 
-    public EnforceStructuralPropertiesRule(FunctionIdentifier 
rangeMapFunction, FunctionIdentifier localSamplingFun) {
+    public EnforceStructuralPropertiesRule(FunctionIdentifier 
rangeMapFunction, FunctionIdentifier localSamplingFun,
+            FunctionIdentifier typePropagatingFun) {
         this.rangeMapFunction = rangeMapFunction;
         this.localSamplingFun = localSamplingFun;
+        this.typePropagatingFun = typePropagatingFun;
     }
 
     @Override
@@ -711,9 +714,9 @@
     private IPhysicalOperator 
createDynamicRangePartitionExchangePOperator(AbstractLogicalOperator parentOp,
             IOptimizationContext ctx, INodeDomain targetDomain, 
List<OrderColumn> partitioningColumns, int childIndex)
             throws AlgebricksException {
-        SourceLocation sourceLoc = parentOp.getSourceLocation();
+        SourceLocation srcLoc = parentOp.getSourceLocation();
         // #1. create the replicate operator and add it above the source op 
feeding parent operator
-        ReplicateOperator replicateOp = 
createReplicateOperator(parentOp.getInputs().get(childIndex), ctx, sourceLoc);
+        ReplicateOperator replicateOp = 
createReplicateOperator(parentOp.getInputs().get(childIndex), ctx, srcLoc);
 
         // these two exchange ops are needed so that the parents of replicate 
stay the same during later optimizations.
         // This is because replicate operator has references to its parents. 
If any later optimizations add new parents,
@@ -724,40 +727,34 @@
         MutableObject<ILogicalOperator> exchToLocalAggRef = new 
MutableObject<>(exchToLocalAgg);
         MutableObject<ILogicalOperator> exchToForwardRef = new 
MutableObject<>(exchToForward);
 
-        // add the exchange--to-forward at output 0, the 
exchange-to-local-aggregate at output 1
+        // add the exchange-to-forward at output 0, the 
exchange-to-local-aggregate at output 1
         replicateOp.getOutputs().add(exchToForwardRef);
         replicateOp.getOutputs().add(exchToLocalAggRef);
         // materialize the data to be able to re-read the data again after 
sampling is done
         replicateOp.getOutputMaterializationFlags()[0] = true;
 
         // #2. create the aggregate operators and their sampling functions
-        // $$samplingResultVar = local_samplingFun($$partitioning_column)
-        // $$rangeMapResultVar = global_rangeMapFun($$samplingResultVar)
-        List<LogicalVariable> samplingResultVar = new ArrayList<>(1);
+        List<LogicalVariable> localVars = new ArrayList<>();
         List<LogicalVariable> rangeMapResultVar = new ArrayList<>(1);
-        List<Mutable<ILogicalExpression>> samplingFun = new ArrayList<>(1);
+        List<Mutable<ILogicalExpression>> localFuns = new ArrayList<>();
         List<Mutable<ILogicalExpression>> rangeMapFun = new ArrayList<>(1);
-
-        createAggregateFunction(ctx, samplingResultVar, samplingFun, 
rangeMapResultVar, rangeMapFun,
-                targetDomain.cardinality(), partitioningColumns, sourceLoc);
-
-        AggregateOperator localAggOp =
-                createAggregate(samplingResultVar, false, samplingFun, 
exchToLocalAggRef, ctx, sourceLoc);
+        createAggregateFunction(ctx, localVars, localFuns, rangeMapResultVar, 
rangeMapFun, targetDomain.cardinality(),
+                partitioningColumns, srcLoc);
+        AggregateOperator localAggOp = createAggregate(localVars, false, 
localFuns, exchToLocalAggRef, ctx, srcLoc);
         MutableObject<ILogicalOperator> localAgg = new 
MutableObject<>(localAggOp);
-        AggregateOperator globalAggOp = createAggregate(rangeMapResultVar, 
true, rangeMapFun, localAgg, ctx, sourceLoc);
+        AggregateOperator globalAggOp = createAggregate(rangeMapResultVar, 
true, rangeMapFun, localAgg, ctx, srcLoc);
         MutableObject<ILogicalOperator> globalAgg = new 
MutableObject<>(globalAggOp);
 
         // #3. create the forward operator
         String rangeMapKey = UUID.randomUUID().toString();
         LogicalVariable rangeMapVar = rangeMapResultVar.get(0);
-        ForwardOperator forward = createForward(rangeMapKey, rangeMapVar, 
exchToForwardRef, globalAgg, ctx, sourceLoc);
+        ForwardOperator forward = createForward(rangeMapKey, rangeMapVar, 
exchToForwardRef, globalAgg, ctx, srcLoc);
         MutableObject<ILogicalOperator> forwardRef = new 
MutableObject<>(forward);
 
         // replace the old input of parentOp requiring the range partitioning 
with the new forward op
         parentOp.getInputs().set(childIndex, forwardRef);
         parentOp.recomputeSchema();
         ctx.computeAndSetTypeEnvironmentForOperator(parentOp);
-
         return new RangePartitionExchangePOperator(partitioningColumns, 
rangeMapKey, targetDomain);
     }
 
@@ -780,56 +777,61 @@
      * will be used when creating the corresponding aggregate operators.
      * @param context used to get new variables which will be assigned the 
samples & the range map
      * @param localResultVariables the variable to which the stats (e.g. 
samples) info is assigned
-     * @param localAggFunctions the local sampling expression is added to this 
list
-     * @param globalResultVariables the variable to which the range map is 
assigned
-     * @param globalAggFunctions the expression generating a range map is 
added to this list
+     * @param localAggFunctions the local sampling expression and columns 
expressions are added to this list
+     * @param globalResultVariable the variable to which the range map is 
assigned
+     * @param globalAggFunction the expression generating a range map is added 
to this list
      * @param numPartitions passed to the expression generating a range map to 
know how many split points are needed
-     * @param partFields the fields based on which the partitioner partitions 
the tuples, also sampled fields
+     * @param partitionFields the fields based on which the partitioner 
partitions the tuples, also sampled fields
      * @param sourceLocation source location
      */
     private void createAggregateFunction(IOptimizationContext context, 
List<LogicalVariable> localResultVariables,
-            List<Mutable<ILogicalExpression>> localAggFunctions, 
List<LogicalVariable> globalResultVariables,
-            List<Mutable<ILogicalExpression>> globalAggFunctions, int 
numPartitions, List<OrderColumn> partFields,
+            List<Mutable<ILogicalExpression>> localAggFunctions, 
List<LogicalVariable> globalResultVariable,
+            List<Mutable<ILogicalExpression>> globalAggFunction, int 
numPartitions, List<OrderColumn> partitionFields,
             SourceLocation sourceLocation) {
-        // prepare the arguments of the local sampling function: sampled fields
-        List<Mutable<ILogicalExpression>> sampledFields = new 
ArrayList<>(partFields.size());
-        partFields.forEach(f -> {
-            AbstractLogicalExpression sampledField = new 
VariableReferenceExpression(f.getColumn());
-            sampledField.setSourceLocation(sourceLocation);
-            sampledFields.add(new MutableObject<>(sampledField));
-        });
-
-        // local info
+        // prepare the arguments to the local sampling function: sampled 
fields (e.g. $col1, $col2)
+        // local info: local agg [$1, $2, $3] = [local-sampling-fun($col1, 
$col2), type_expr($col1), type_expr($col2)]
+        // global info: global agg [$RM] = [global-range-map($1, $2, $3)]
         IFunctionInfo samplingFun = 
context.getMetadataProvider().lookupFunction(localSamplingFun);
-        AbstractFunctionCallExpression samplingExp =
-                new AggregateFunctionCallExpression(samplingFun, false, 
sampledFields);
-        samplingExp.setSourceLocation(sourceLocation);
-        LogicalVariable samplingResultVar = context.newVar();
-        localResultVariables.add(samplingResultVar);
-        localAggFunctions.add(new MutableObject<>(samplingExp));
-        Object[] samplingParam = { 
context.getPhysicalOptimizationConfig().getSortSamples() };
-        samplingExp.setOpaqueParameters(samplingParam);
-
-        // prepare the argument of the global range map generator function: 
the result of the local function
-        List<Mutable<ILogicalExpression>> arg = new ArrayList<>(1);
-        AbstractLogicalExpression samplingResultVarExp = new 
VariableReferenceExpression(samplingResultVar);
-        samplingResultVarExp.setSourceLocation(sourceLocation);
-        arg.add(new MutableObject<>(samplingResultVarExp));
-
-        // global info
-        IFunctionInfo rangeMapFun = 
context.getMetadataProvider().lookupFunction(rangeMapFunction);
-        AbstractFunctionCallExpression rangeMapExp = new 
AggregateFunctionCallExpression(rangeMapFun, true, arg);
-        rangeMapExp.setSourceLocation(sourceLocation);
-        globalResultVariables.add(context.newVar());
-        globalAggFunctions.add(new MutableObject<>(rangeMapExp));
-
+        List<Mutable<ILogicalExpression>> fields = new 
ArrayList<>(partitionFields.size());
+        List<Mutable<ILogicalExpression>> argsToRM = new ArrayList<>(1 + 
partitionFields.size());
+        AbstractFunctionCallExpression expr = new 
AggregateFunctionCallExpression(samplingFun, false, fields);
+        expr.setSourceLocation(sourceLocation);
+        expr.setOpaqueParameters(new Object[] { 
context.getPhysicalOptimizationConfig().getSortSamples() });
+        // add the sampling function to the list of the local functions
+        LogicalVariable localOutVariable = context.newVar();
+        localResultVariables.add(localOutVariable);
+        localAggFunctions.add(new MutableObject<>(expr));
+        // add the local result variable as input to the global range map 
function
+        AbstractLogicalExpression varExprRef = new 
VariableReferenceExpression(localOutVariable, sourceLocation);
+        argsToRM.add(new MutableObject<>(varExprRef));
         int i = 0;
-        boolean[] ascendingFlags = new boolean[partFields.size()];
-        for (OrderColumn column : partFields) {
-            ascendingFlags[i] = column.getOrder() == 
OrderOperator.IOrder.OrderKind.ASC;
+        boolean[] ascendingFlags = new boolean[partitionFields.size()];
+        IFunctionInfo typeFun = 
context.getMetadataProvider().lookupFunction(typePropagatingFun);
+        for (OrderColumn field : partitionFields) {
+            // add the field to the "fields" which is the input to the local 
sampling function
+            varExprRef = new VariableReferenceExpression(field.getColumn(), 
sourceLocation);
+            fields.add(new MutableObject<>(varExprRef));
+            // add the same field as input to the corresponding local function 
propagating the type of the field
+            expr = new AggregateFunctionCallExpression(typeFun, false,
+                    Collections.singletonList(new 
MutableObject<>(varExprRef)));
+            // add the type propagating function to the list of the local 
functions
+            localOutVariable = context.newVar();
+            localResultVariables.add(localOutVariable);
+            localAggFunctions.add(new MutableObject<>(expr));
+            // add the local result variable as input to the global range map 
function
+            varExprRef = new VariableReferenceExpression(localOutVariable, 
sourceLocation);
+            argsToRM.add(new MutableObject<>(varExprRef));
+            ascendingFlags[i] = field.getOrder() == 
OrderOperator.IOrder.OrderKind.ASC;
             i++;
         }
+        IFunctionInfo rangeMapFun = 
context.getMetadataProvider().lookupFunction(rangeMapFunction);
+        AggregateFunctionCallExpression rangeMapExp = new 
AggregateFunctionCallExpression(rangeMapFun, true, argsToRM);
+        rangeMapExp.setStepOneAggregate(samplingFun);
+        rangeMapExp.setStepTwoAggregate(rangeMapFun);
+        rangeMapExp.setSourceLocation(sourceLocation);
         rangeMapExp.setOpaqueParameters(new Object[] { numPartitions, 
ascendingFlags });
+        globalResultVariable.add(context.newVar());
+        globalAggFunction.add(new MutableObject<>(rangeMapExp));
     }
 
     /**
@@ -874,11 +876,10 @@
 
     private static ForwardOperator createForward(String rangeMapKey, 
LogicalVariable rangeMapVariable,
             MutableObject<ILogicalOperator> exchangeOpFromReplicate, 
MutableObject<ILogicalOperator> globalAggInput,
-            IOptimizationContext context, SourceLocation sourceLocation) 
throws AlgebricksException {
-        AbstractLogicalExpression rangeMapExpression = new 
VariableReferenceExpression(rangeMapVariable);
-        rangeMapExpression.setSourceLocation(sourceLocation);
+            IOptimizationContext context, SourceLocation sourceLoc) throws 
AlgebricksException {
+        AbstractLogicalExpression rangeMapExpression = new 
VariableReferenceExpression(rangeMapVariable, sourceLoc);
         ForwardOperator forwardOperator = new ForwardOperator(rangeMapKey, new 
MutableObject<>(rangeMapExpression));
-        forwardOperator.setSourceLocation(sourceLocation);
+        forwardOperator.setSourceLocation(sourceLoc);
         forwardOperator.setPhysicalOperator(new ForwardPOperator());
         forwardOperator.getInputs().add(exchangeOpFromReplicate);
         forwardOperator.getInputs().add(globalAggInput);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
index 24c5cae..49eea0a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.ByteArraySerializerDeserializer;
@@ -46,6 +47,7 @@
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
+// TODO(ali): forward operator should probably be moved to asterix layer
 public class ForwardOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
     private static final int FORWARD_DATA_ACTIVITY_ID = 0;
@@ -173,8 +175,10 @@
             byte[] rangeMap = frameTupleReference.getFieldData(0);
             int offset = frameTupleReference.getFieldStart(0);
             int length = frameTupleReference.getFieldLength(0);
-
-            ByteArrayInputStream rangeMapIn = new 
ByteArrayInputStream(rangeMap, offset, length);
+            ByteArrayPointable pointable = new ByteArrayPointable();
+            pointable.set(rangeMap, offset + 1, length - 1);
+            ByteArrayInputStream rangeMapIn = new 
ByteArrayInputStream(pointable.getByteArray(),
+                    pointable.getContentStartOffset(), 
pointable.getContentLength());
             DataInputStream dataInputStream = new DataInputStream(rangeMapIn);
             numFields = IntegerSerializerDeserializer.read(dataInputStream);
             splitValues = 
ByteArraySerializerDeserializer.read(dataInputStream);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IntSerDeUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IntSerDeUtils.java
index 05e694c..821707f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IntSerDeUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IntSerDeUtils.java
@@ -37,7 +37,7 @@
      *            value to write to {@code bytes[offset]}
      */
     public static void putInt(byte[] bytes, int offset, int value) {
-
+        // TODO: there is another implementation in IntegerPointable
         bytes[offset++] = (byte) (value >> 24);
         bytes[offset++] = (byte) (value >> 16);
         bytes[offset++] = (byte) (value >> 8);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3333
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I7edbb10906cc4464210af87a5b1630ba3aecbde0
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Ali Alsuliman <ali.al.solai...@gmail.com>
Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Dmitry Lychagin <dmitry.lycha...@couchbase.com>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>

Reply via email to