Repository: incubator-hawq
Updated Branches:
  refs/heads/master be0547200 -> f5ffddf26


HAWQ-1404. PXF to leverage file-level stats of ORC file and emit records for 
COUNT(*).


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/f5ffddf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/f5ffddf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/f5ffddf2

Branch: refs/heads/master
Commit: f5ffddf26ddc990d616c905f883c383e7f1c8542
Parents: be05472
Author: Oleksandr Diachenko <odiache...@pivotal.io>
Authored: Tue Apr 4 02:32:28 2017 -0700
Committer: Oleksandr Diachenko <odiache...@pivotal.io>
Committed: Tue Apr 4 02:32:28 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hawq/pxf/api/StatsAccessor.java  | 40 +++++++++
 .../pxf/api/utilities/EnumAggregationType.java  | 50 +++++++++++
 .../pxf/api/utilities/FragmentMetadata.java     | 86 ++++++++++++++++++
 .../hawq/pxf/api/utilities/InputData.java       | 34 +++++++
 .../hawq/pxf/api/utilities/Utilities.java       | 80 +++++++++++++++++
 .../hawq/pxf/api/utilities/UtilitiesTest.java   | 91 +++++++++++++++++++
 .../plugins/hdfs/HdfsAtomicDataAccessor.java    |  2 +-
 .../hdfs/HdfsSplittableDataAccessor.java        |  2 +-
 .../plugins/hdfs/utilities/HdfsUtilities.java   | 26 +-----
 .../hdfs/utilities/HdfsUtilitiesTest.java       | 21 +++++
 .../hawq/pxf/plugins/hive/HiveORCAccessor.java  | 77 +++++++++++++++-
 .../plugins/hive/utilities/HiveUtilities.java   | 27 +++++-
 .../pxf/plugins/hive/HiveORCAccessorTest.java   | 13 +++
 .../org/apache/hawq/pxf/service/AggBridge.java  | 95 ++++++++++++++++++++
 .../hawq/pxf/service/rest/BridgeResource.java   |  5 +-
 .../pxf/service/utilities/ProtocolData.java     | 13 +++
 src/bin/gpfusion/gpbridgeapi.c                  |  1 +
 17 files changed, 632 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java 
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
new file mode 100644
index 0000000..7ecdf52
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api;
+
+import org.apache.hawq.pxf.api.OneRow;
+
+/**
+ * Interface of accessor which can leverage statistic information for 
aggregate queries
+ *
+ */
+public interface StatsAccessor extends ReadAccessor {
+
+    /**
+     * Method which reads needed statistics for current split
+     */
+    public void retrieveStats() throws Exception;
+
+    /**
+     * Returns next tuple based on statistics information without actual 
reading of data
+     */
+    public OneRow emitAggObject();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
 
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
new file mode 100644
index 0000000..5557c49
--- /dev/null
+++ 
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api.utilities;
+
+public enum EnumAggregationType {
+
+    COUNT("count", true);
+
+    private String aggOperationCode;
+    private boolean optimizationSupported;
+
+    private EnumAggregationType(String aggOperationCode, boolean 
optimizationSupported) {
+        this.aggOperationCode = aggOperationCode;
+        this.optimizationSupported = optimizationSupported;
+    }
+
+    public String getAggOperationCode() {
+        return this.aggOperationCode;
+    }
+
+    public boolean isOptimizationSupported() {
+        return this.optimizationSupported;
+    }
+
+    public static EnumAggregationType getAggregationType(String 
aggOperationCode) {
+        for (EnumAggregationType at : values()) {
+            if (at.getAggOperationCode().equals(aggOperationCode)) {
+                return at;
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
 
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
new file mode 100644
index 0000000..7b0a3c4
--- /dev/null
+++ 
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api.utilities;
+
+/**
+ * Class which holds metadata of a file split and locality information.
+ *
+ */
+public class FragmentMetadata {
+
+    private long start;
+    private long end;
+    private String[] hosts;
+
+    public FragmentMetadata(long start, long end, String[] hosts) {
+        this.start = start;
+        this.end = end;
+        this.hosts = hosts;
+    }
+
+    /**
+     * Returns start position of a fragment
+     * @return position in bytes where given data fragment starts
+     */
+    public long getStart() {
+        return start;
+    }
+
+    /**
+     * Sets start position of a fragment
+     * @param start start position
+     */
+    public void setStart(long start) {
+        this.start = start;
+    }
+
+    /**
+     * Returns end positoon of a fragment
+     * @return position in bytes where given data fragment ends
+     */
+    public long getEnd() {
+        return end;
+    }
+
+    /**
+     * Sets end position of a fragment
+     * @param end end position
+     */
+    public void setEnd(long end) {
+        this.end = end;
+    }
+
+    /**
+     * Returns all hosts which have given data fragment
+     * @return all hosts which have given data fragment
+     */
+    public String[] getHosts() {
+        return hosts;
+    }
+
+    /**
+     * Sets hosts for a given fragment
+     * @param hosts hosts which have given fragment
+     */
+    public void setHosts(String[] hosts) {
+        this.hosts = hosts;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java 
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
index 9816fdc..959cda6 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
@@ -52,6 +52,8 @@ public class InputData {
     protected String remoteLogin;
     protected String remoteSecret;
     protected int dataFragment; /* should be deprecated */
+    private EnumAggregationType aggType;
+    private int fragmentIndex;
 
     /**
      * When false the bridge has to run in synchronized mode. default value -
@@ -335,4 +337,36 @@ public class InputData {
         return dataFragment;
     }
 
+    /**
+     * Returns aggregate type, i.e - count, min, max, etc
+     * @return aggregate type
+     */
+    public EnumAggregationType getAggType() {
+        return aggType;
+    }
+
+    /**
+     * Sets aggregate type, one of @see EnumAggregationType value
+     * @param aggType aggregate type
+     */
+    public void setAggType(EnumAggregationType aggType) {
+        this.aggType = aggType;
+    }
+
+    /**
+     * Returns index of a fragment in a file
+     * @return index of a fragment
+     */
+    public int getFragmentIndex() {
+        return fragmentIndex;
+    }
+
+    /**
+     * Sets index of a fragment in a file
+     * @param fragmentIndex index of a fragment
+     */
+    public void setFragmentIndex(int fragmentIndex) {
+        this.fragmentIndex = fragmentIndex;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java 
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
index 51326bc..29d9c52 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
@@ -20,9 +20,15 @@ package org.apache.hawq.pxf.api.utilities;
  */
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.StatsAccessor;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
@@ -151,4 +157,78 @@ public class Utilities {
         }
         return input.replaceAll("[^a-zA-Z0-9_:/-]", ".");
     }
+
+    /**
+     * Parses input data and returns fragment metadata.
+     * 
+     * @param inputData input data which has protocol information
+     * @return fragment metadata
+     * @throws IllegalArgumentException if fragment metadata information 
wasn't found in input data
+     * @throws Exception
+     */
+    public static FragmentMetadata parseFragmentMetadata(InputData inputData) 
throws Exception {
+        byte[] serializedLocation = inputData.getFragmentMetadata();
+        if (serializedLocation == null) {
+            throw new IllegalArgumentException("Missing fragment location 
information");
+        }
+        try (ObjectInputStream objectStream = new ObjectInputStream(new 
ByteArrayInputStream(serializedLocation))) {
+            long start = objectStream.readLong();
+            long end = objectStream.readLong();
+            String[] hosts = (String[]) objectStream.readObject();
+            if (LOG.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("parsed file split: path ");
+                sb.append(inputData.getDataSource());
+                sb.append(", start ");
+                sb.append(start);
+                sb.append(", end ");
+                sb.append(end);
+                sb.append(", hosts ");
+                sb.append(ArrayUtils.toString(hosts));
+                LOG.debug(sb.toString());
+            }
+            FragmentMetadata fragmentMetadata = new FragmentMetadata(start, 
end, hosts);
+            return fragmentMetadata;
+        } catch (Exception e) {
+            LOG.error("Unable to parse fragment metadata");
+            throw e;
+        }
+    }
+
+    /**
+     * Based on accessor information determines whether to use AggBridge
+     * 
+     * @param protData
+     * @return true if AggBridge is applicable for current context
+     */
+    public static boolean useAggBridge(InputData inputData) {
+        boolean isStatsAccessor = false;
+        try {
+            isStatsAccessor = 
ArrayUtils.contains(Class.forName(inputData.getAccessor()).getInterfaces(), 
StatsAccessor.class);
+        } catch (ClassNotFoundException e) {
+            LOG.error("Unable to load accessor class: " + e.getMessage());
+            return false;
+        }
+        return (inputData != null) && (inputData.getAggType() != null)
+                && inputData.getAggType().isOptimizationSupported()
+                && isStatsAccessor;
+    }
+
+    /**
+     * Determines whether accessor should use statistics to optimize reading 
results
+     * 
+     * @param accessor accessor instance
+     * @param inputData input data which has protocol information
+     * @return true if this accessor should use statistic information
+     */
+    public static boolean useStats(ReadAccessor accessor, InputData inputData) 
{
+        if (accessor instanceof StatsAccessor) {
+            if (inputData != null && !inputData.hasFilter()
+                    && inputData.getAggType() != null
+                    && inputData.getAggType().isOptimizationSupported()) {
+                return true;
+            }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
 
b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
index 355ea42..6fe896a 100644
--- 
a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
+++ 
b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
@@ -22,10 +22,19 @@ package org.apache.hawq.pxf.api.utilities;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.hawq.pxf.api.Metadata;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.StatsAccessor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.junit.Test;
@@ -37,6 +46,49 @@ import org.powermock.modules.junit4.PowerMockRunner;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Class.class})
 public class UtilitiesTest {
+    class StatsAccessorImpl implements StatsAccessor {
+
+        @Override
+        public boolean openForRead() throws Exception {
+            return false;
+        }
+
+        @Override
+        public OneRow readNextObject() throws Exception {
+            return null;
+        }
+
+        @Override
+        public void closeForRead() throws Exception {
+        }
+
+        @Override
+        public void retrieveStats() throws Exception {
+        }
+
+        @Override
+        public OneRow emitAggObject() {
+            return null;
+        }
+    }
+
+    class NonStatsAccessorImpl implements ReadAccessor {
+
+        @Override
+        public boolean openForRead() throws Exception {
+            return false;
+        }
+
+        @Override
+        public OneRow readNextObject() throws Exception {
+            return null;
+        }
+
+        @Override
+        public void closeForRead() throws Exception {
+        }
+    }
+
     @Test
     public void byteArrayToOctalStringNull() throws Exception {
         StringBuilder sb = null;
@@ -114,4 +166,43 @@ public class UtilitiesTest {
         result = Utilities.maskNonPrintables(input);
         assertEquals("http://www.beatles.com/info.query.whoisthebest";, result);
     }
+
+    @Test
+    public void parseFragmentMetadata() throws Exception {
+        InputData metaData = mock(InputData.class);
+        ByteArrayOutputStream bas = new ByteArrayOutputStream();
+        ObjectOutputStream os = new ObjectOutputStream(bas);
+        os.writeLong(10);
+        os.writeLong(100);
+        os.writeObject(new String[] { "hostname" });
+        os.close();
+        when(metaData.getFragmentMetadata()).thenReturn(bas.toByteArray());
+        FragmentMetadata fragmentMetadata = 
Utilities.parseFragmentMetadata(metaData);
+
+        assertEquals(10, fragmentMetadata.getStart());
+        assertEquals(100, fragmentMetadata.getEnd());
+        assertEquals(new String[] { "hostname" }, fragmentMetadata.getHosts());
+    }
+
+    @Test
+    public void useAggBridge() {
+        InputData metaData = mock(InputData.class);
+        
when(metaData.getAccessor()).thenReturn(StatsAccessorImpl.class.getName());
+        when(metaData.getAggType()).thenReturn(EnumAggregationType.COUNT);
+        assertTrue(Utilities.useAggBridge(metaData));
+
+        when(metaData.getAccessor()).thenReturn(UtilitiesTest.class.getName());
+        when(metaData.getAggType()).thenReturn(EnumAggregationType.COUNT);
+        assertFalse(Utilities.useAggBridge(metaData));
+    }
+
+    @Test
+    public void useStats() {
+        InputData metaData = mock(InputData.class);
+        ReadAccessor accessor = new StatsAccessorImpl();
+        when(metaData.getAggType()).thenReturn(EnumAggregationType.COUNT);
+        assertTrue(Utilities.useStats(accessor, metaData));
+        ReadAccessor nonStatusAccessor = new NonStatsAccessorImpl();
+        assertFalse(Utilities.useStats(nonStatusAccessor, metaData));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
 
b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
index 178b774..a95248e 100644
--- 
a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
+++ 
b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
@@ -65,7 +65,7 @@ public abstract class HdfsAtomicDataAccessor extends Plugin 
implements ReadAcces
         // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml 
files
         conf = new Configuration();
 
-        fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+        fileSplit = HdfsUtilities.parseFileSplit(inputData);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
 
b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
index 0174bd8..b61d76a 100644
--- 
a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
+++ 
b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
@@ -75,7 +75,7 @@ public abstract class HdfsSplittableDataAccessor extends 
Plugin implements
     @Override
     public boolean openForRead() throws Exception {
         LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>();
-        FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+        FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData);
         requestSplits.add(fileSplit);
 
         // Initialize record reader based on current split

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
 
b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
index c99ccd6..1aae838 100644
--- 
a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
+++ 
b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
@@ -22,6 +22,7 @@ package org.apache.hawq.pxf.plugins.hdfs.utilities;
 
 import org.apache.hawq.pxf.api.io.DataType;
 import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.avro.Schema;
@@ -30,7 +31,6 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -171,29 +171,11 @@ public class HdfsUtilities {
      * @param inputData request input data
      * @return FileSplit with fragment metadata
      */
-    public static FileSplit parseFragmentMetadata(InputData inputData) {
+    public static FileSplit parseFileSplit(InputData inputData) {
         try {
-            byte[] serializedLocation = inputData.getFragmentMetadata();
-            if (serializedLocation == null) {
-                throw new IllegalArgumentException(
-                        "Missing fragment location information");
-            }
-
-            ByteArrayInputStream bytesStream = new ByteArrayInputStream(
-                    serializedLocation);
-            ObjectInputStream objectStream = new 
ObjectInputStream(bytesStream);
-
-            long start = objectStream.readLong();
-            long end = objectStream.readLong();
-
-            String[] hosts = (String[]) objectStream.readObject();
-
-            FileSplit fileSplit = new FileSplit(new Path(
-                    inputData.getDataSource()), start, end, hosts);
+            FragmentMetadata fragmentMetadata = 
Utilities.parseFragmentMetadata(inputData);
 
-            LOG.debug("parsed file split: path " + inputData.getDataSource()
-                    + ", start " + start + ", end " + end + ", hosts "
-                    + ArrayUtils.toString(hosts));
+            FileSplit fileSplit = new FileSplit(new 
Path(inputData.getDataSource()), fragmentMetadata.getStart(), 
fragmentMetadata.getEnd(), fragmentMetadata.getHosts());
 
             return fileSplit;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
 
b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
index 36ca846..9b1fc6d 100644
--- 
a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
+++ 
b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
@@ -21,10 +21,12 @@ package org.apache.hawq.pxf.plugins.hdfs.utilities;
 
 
 import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,6 +37,8 @@ import 
org.powermock.core.classloader.annotations.SuppressStaticInitializationFo
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -198,4 +202,21 @@ public class HdfsUtilitiesTest {
 
         assertEquals("", 
HdfsUtilities.toString(Collections.<OneField>emptyList(), "!"));
     }
+
+    @Test
+    public void testParseFileSplit() throws Exception {
+        InputData inputData = mock(InputData.class);
+        when(inputData.getDataSource()).thenReturn("/abc/path/to/data/source");
+        ByteArrayOutputStream bas = new ByteArrayOutputStream();
+        ObjectOutputStream os = new ObjectOutputStream(bas);
+        os.writeLong(10);
+        os.writeLong(100);
+        os.writeObject(new String[] { "hostname" });
+        os.close();
+        when(inputData.getFragmentMetadata()).thenReturn(bas.toByteArray());
+        FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData);
+        assertEquals(fileSplit.getStart(), 10);
+        assertEquals(fileSplit.getLength(), 100);
+        assertEquals(fileSplit.getPath().toString(), 
"/abc/path/to/data/source");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
 
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index 07348b0..5f7c584 100644
--- 
a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ 
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -23,16 +23,24 @@ package org.apache.hawq.pxf.plugins.hive;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hawq.pxf.api.BasicFilter;
 import org.apache.hawq.pxf.api.LogicalFilter;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.StatsAccessor;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
+import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
 import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapred.*;
 
+import java.io.IOException;
 import java.sql.Date;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,7 +53,7 @@ import static 
org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_
  * This class replaces the generic HiveAccessor for a case where a table is 
stored entirely as ORC files.
  * Use together with {@link HiveInputFormatFragmenter}/{@link 
HiveColumnarSerdeResolver}
  */
-public class HiveORCAccessor extends HiveAccessor {
+public class HiveORCAccessor extends HiveAccessor implements StatsAccessor {
 
     private static final Log LOG = LogFactory.getLog(HiveORCAccessor.class);
 
@@ -53,6 +61,14 @@ public class HiveORCAccessor extends HiveAccessor {
     private final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
     private final String READ_COLUMN_NAMES_CONF_STR = 
"hive.io.file.readcolumn.names";
     private final String SARG_PUSHDOWN = "sarg.pushdown";
+    protected Reader orcReader;
+
+    private boolean useStats;
+    private long count;
+    private long objectsEmitted;
+    private OneRow rowToEmitCount;
+
+    private boolean statsInitialized;
 
     /**
      * Constructs a HiveORCFileAccessor.
@@ -65,12 +81,21 @@ public class HiveORCAccessor extends HiveAccessor {
         HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, 
PXF_HIVE_SERDES.ORC_SERDE);
         initPartitionFields(hiveUserData.getPartitionKeys());
         filterInFragmenter = hiveUserData.isFilterInFragmenter();
+        useStats = Utilities.useStats(this, inputData);
     }
 
     @Override
     public boolean openForRead() throws Exception {
-        addColumns();
-        addFilters();
+        if (useStats) {
+            orcReader = HiveUtilities.getOrcReader(inputData);
+            if (orcReader == null) {
+                return false;
+            }
+            objectsEmitted = 0;
+        } else {
+            addColumns();
+            addFilters();
+        }
         return super.openForRead();
     }
 
@@ -213,4 +238,50 @@ public class HiveORCAccessor extends HiveAccessor {
         return true;
     }
 
+    /**
+     * Fetches file-level statistics from an ORC file.
+     */
+    @Override
+    public void retrieveStats() throws Exception {
+        if (!this.useStats) {
+            throw new IllegalStateException("Accessor is not using statistics 
in current context.");
+        }
+        /*
+         * We are using file-level stats therefore if file has multiple splits,
+         * it's enough to return count for a first split in file.
+         * In case file has multiple splits - we don't want to duplicate 
counts.
+         */
+        if (inputData.getFragmentIndex() == 0) {
+            this.count = this.orcReader.getNumberOfRows();
+            rowToEmitCount = readNextObject();
+        }
+        statsInitialized = true;
+
+    }
+
+    /**
+     * Emits tuple without reading from disk, currently supports COUNT
+     */
+    @Override
+    public OneRow emitAggObject() {
+        if(!statsInitialized) {
+            throw new IllegalStateException("retrieveStats() should be called 
before calling emitAggObject()");
+        }
+        OneRow row = null;
+        if (inputData.getAggType() == null)
+            throw new UnsupportedOperationException("Aggregate opration is 
required");
+        switch (inputData.getAggType()) {
+            case COUNT:
+                if (objectsEmitted < count) {
+                    objectsEmitted++;
+                    row = rowToEmitCount;
+                }
+                break;
+            default: {
+                throw new UnsupportedOperationException("Aggregation operation 
is not supported.");
+            }
+        }
+        return row;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
 
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index 3328c9f..808c415 100644
--- 
a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ 
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -30,6 +30,8 @@ import java.util.Properties;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -39,6 +41,11 @@ import 
org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.*;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hawq.pxf.api.Fragmenter;
 import org.apache.hawq.pxf.api.Metadata;
 import org.apache.hawq.pxf.api.Metadata.Field;
@@ -48,12 +55,9 @@ import org.apache.hawq.pxf.api.utilities.EnumHawqType;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.hawq.pxf.api.io.DataType;
-import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hawq.pxf.plugins.hive.HiveDataFragmenter;
 import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter;
 import org.apache.hawq.pxf.plugins.hive.HiveTablePartition;
-import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
-import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import 
org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS;
 import org.apache.hawq.pxf.plugins.hive.HiveUserData;
 import 
org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
@@ -627,4 +631,21 @@ public class HiveUtilities {
 
         return deserializer;
     }
+
+    /**
+     * Creates ORC file reader.
+     * @param inputData input data with given data source
+     * @return ORC file reader
+     */
+    public static Reader getOrcReader(InputData inputData) {
+        try {
+            Path path = new Path(inputData.getDataSource());
+            Reader reader = OrcFile.createReader(path.getFileSystem(new 
Configuration()), path);
+
+            return reader;
+
+        } catch (Exception e) {
+            throw new RuntimeException("Exception while getting orc reader", 
e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
 
b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
index 8b4bf13..daee331 100644
--- 
a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
+++ 
b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
@@ -20,10 +20,14 @@ package org.apache.hawq.pxf.plugins.hive;
  */
 
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 import org.apache.hadoop.mapred.*;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
@@ -39,6 +43,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import static 
org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.SARG_PUSHDOWN;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -55,6 +61,7 @@ public class HiveORCAccessorTest {
     @Mock OrcInputFormat orcInputFormat;
     @Mock InputFormat inputFormat;
     @Mock ColumnDescriptor columnDesc;
+    @Mock Reader orcReader;
     JobConf jobConf;
     HiveORCAccessor accessor;
 
@@ -65,6 +72,7 @@ public class HiveORCAccessorTest {
 
         PowerMockito.mockStatic(HiveUtilities.class);
         
PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), 
any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, 
HiveDataFragmenter.HIVE_NO_PART_TBL, true, "1", ""));
+        
PowerMockito.when(HiveUtilities.getOrcReader(any(InputData.class))).thenReturn(orcReader);
 
         PowerMockito.mockStatic(HdfsUtilities.class);
 
@@ -121,4 +129,9 @@ public class HiveORCAccessorTest {
         assertEquals(sarg.toKryo(), jobConf.get(SARG_PUSHDOWN));
     }
 
+    @Test(expected=IllegalStateException.class)
+    public void emitAggObjectCountStatsNotInitialized() {
+        accessor.emitAggObject();
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java 
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
new file mode 100644
index 0000000..c03a6a2
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.StatsAccessor;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.collections.map.LRUMap;
+
+/**
+ * Bridge class optimized for aggregate queries.
+ *
+ */
+public class AggBridge extends ReadBridge implements Bridge {
+    private static final Log LOG = LogFactory.getLog(AggBridge.class);
+    /* Avoid resolving rows with the same key twice */
+    private LRUMap outputCache;
+
+    public AggBridge(ProtocolData protData) throws Exception {
+        super(protData);
+    }
+
+    @Override
+    public boolean beginIteration() throws Exception {
+        /* Initialize LRU cache with 100 items*/
+        outputCache = new LRUMap();
+        boolean openForReadStatus = super.fileAccessor.openForRead();
+        ((StatsAccessor) fileAccessor).retrieveStats();
+        return openForReadStatus;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Writable getNext() throws Exception {
+        Writable output = null;
+        LinkedList<Writable> cachedOutput = null;
+        OneRow onerow = null;
+
+        if (!outputQueue.isEmpty()) {
+            return outputQueue.pop();
+        }
+
+        try {
+            while (outputQueue.isEmpty()) {
+                onerow = ((StatsAccessor) fileAccessor).emitAggObject();
+                if (onerow == null) {
+                    break;
+                }
+                cachedOutput = (LinkedList<Writable>) 
outputCache.get(onerow.getKey());
+                if (cachedOutput == null) {
+                    cachedOutput = 
outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
+                    outputCache.put(onerow.getKey(), cachedOutput);
+                }
+                outputQueue.addAll(cachedOutput);
+                if (!outputQueue.isEmpty()) {
+                    output = outputQueue.pop();
+                    break;
+                }
+            }
+        } catch (Exception ex) {
+            LOG.error("Error occurred when reading next object from aggregate 
bridge:" + ex.getMessage());
+            throw ex;
+        }
+
+        return output;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
 
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
index 104f353..4294e09 100644
--- 
a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
+++ 
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
@@ -39,7 +39,8 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.catalina.connector.ClientAbortException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.service.AggBridge;
 import org.apache.hawq.pxf.service.Bridge;
 import org.apache.hawq.pxf.service.ReadBridge;
 import org.apache.hawq.pxf.service.ReadSamplingBridge;
@@ -98,6 +99,8 @@ public class BridgeResource extends RestResource {
         float sampleRatio = protData.getStatsSampleRatio();
         if (sampleRatio > 0) {
             bridge = new ReadSamplingBridge(protData);
+        } else if (Utilities.useAggBridge(protData)) {
+            bridge = new AggBridge(protData);
         } else {
             bridge = new ReadBridge(protData);
         }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
 
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index dc2a110..0cb6d47 100644
--- 
a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ 
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hawq.pxf.api.OutputFormat;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.ProfilesConf;
 
@@ -115,6 +116,18 @@ public class ProtocolData extends InputData {
 
         // Store alignment for global use as a system property
         System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
+
+        //Get aggregation operation
+        String aggTypeOperationName = getOptionalProperty("AGG-TYPE");
+
+        
this.setAggType(EnumAggregationType.getAggregationType(aggTypeOperationName));
+
+        //Get fragment index
+        String fragmentIndexStr = getOptionalProperty("FRAGMENT-INDEX");
+
+        if (fragmentIndexStr != null) {
+            this.setFragmentIndex(Integer.parseInt(fragmentIndexStr));
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index f176586..a853f06 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -220,6 +220,7 @@ void set_current_fragment_headers(gphadoop_context* context)
        churl_headers_override(context->churl_headers, "X-GP-DATA-DIR", 
frag_data->source_name);
        churl_headers_override(context->churl_headers, "X-GP-DATA-FRAGMENT", 
frag_data->index);
        churl_headers_override(context->churl_headers, 
"X-GP-FRAGMENT-METADATA", frag_data->fragment_md);
+       churl_headers_override(context->churl_headers, "X-GP-FRAGMENT-INDEX", 
frag_data->index);
 
        if (frag_data->user_data)
        {


Reply via email to