Repository: incubator-hawq Updated Branches: refs/heads/master be0547200 -> f5ffddf26
HAWQ-1404. PXF to leverage file-level stats of ORC file and emit records for COUNT(*). Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/f5ffddf2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/f5ffddf2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/f5ffddf2 Branch: refs/heads/master Commit: f5ffddf26ddc990d616c905f883c383e7f1c8542 Parents: be05472 Author: Oleksandr Diachenko <odiache...@pivotal.io> Authored: Tue Apr 4 02:32:28 2017 -0700 Committer: Oleksandr Diachenko <odiache...@pivotal.io> Committed: Tue Apr 4 02:32:28 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hawq/pxf/api/StatsAccessor.java | 40 +++++++++ .../pxf/api/utilities/EnumAggregationType.java | 50 +++++++++++ .../pxf/api/utilities/FragmentMetadata.java | 86 ++++++++++++++++++ .../hawq/pxf/api/utilities/InputData.java | 34 +++++++ .../hawq/pxf/api/utilities/Utilities.java | 80 +++++++++++++++++ .../hawq/pxf/api/utilities/UtilitiesTest.java | 91 +++++++++++++++++++ .../plugins/hdfs/HdfsAtomicDataAccessor.java | 2 +- .../hdfs/HdfsSplittableDataAccessor.java | 2 +- .../plugins/hdfs/utilities/HdfsUtilities.java | 26 +----- .../hdfs/utilities/HdfsUtilitiesTest.java | 21 +++++ .../hawq/pxf/plugins/hive/HiveORCAccessor.java | 77 +++++++++++++++- .../plugins/hive/utilities/HiveUtilities.java | 27 +++++- .../pxf/plugins/hive/HiveORCAccessorTest.java | 13 +++ .../org/apache/hawq/pxf/service/AggBridge.java | 95 ++++++++++++++++++++ .../hawq/pxf/service/rest/BridgeResource.java | 5 +- .../pxf/service/utilities/ProtocolData.java | 13 +++ src/bin/gpfusion/gpbridgeapi.c | 1 + 17 files changed, 632 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java new file mode 100644 index 0000000..7ecdf52 --- /dev/null +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hawq.pxf.api; + +import org.apache.hawq.pxf.api.OneRow; + +/** + * Interface of accessor which can leverage statistic information for aggregate queries + * + */ +public interface StatsAccessor extends ReadAccessor { + + /** + * Method which reads needed statistics for current split + */ + public void retrieveStats() throws Exception; + + /** + * Returns next tuple based on statistics information without actual reading of data + */ + public OneRow emitAggObject(); + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java new file mode 100644 index 0000000..5557c49 --- /dev/null +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hawq.pxf.api.utilities; + +public enum EnumAggregationType { + + COUNT("count", true); + + private String aggOperationCode; + private boolean optimizationSupported; + + private EnumAggregationType(String aggOperationCode, boolean optimizationSupported) { + this.aggOperationCode = aggOperationCode; + this.optimizationSupported = optimizationSupported; + } + + public String getAggOperationCode() { + return this.aggOperationCode; + } + + public boolean isOptimizationSupported() { + return this.optimizationSupported; + } + + public static EnumAggregationType getAggregationType(String aggOperationCode) { + for (EnumAggregationType at : values()) { + if (at.getAggOperationCode().equals(aggOperationCode)) { + return at; + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java new file mode 100644 index 0000000..7b0a3c4 --- /dev/null +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hawq.pxf.api.utilities; + +/** + * Class which holds metadata of a file split and locality information. + * + */ +public class FragmentMetadata { + + private long start; + private long end; + private String[] hosts; + + public FragmentMetadata(long start, long end, String[] hosts) { + this.start = start; + this.end = end; + this.hosts = hosts; + } + + /** + * Returns start position of a fragment + * @return position in bytes where given data fragment starts + */ + public long getStart() { + return start; + } + + /** + * Sets start position of a fragment + * @param start start position + */ + public void setStart(long start) { + this.start = start; + } + + /** + * Returns end positoon of a fragment + * @return position in bytes where given data fragment ends + */ + public long getEnd() { + return end; + } + + /** + * Sets end position of a fragment + * @param end end position + */ + public void setEnd(long end) { + this.end = end; + } + + /** + * Returns all hosts which have given data fragment + * @return all hosts which have given data fragment + */ + public String[] getHosts() { + return hosts; + } + + /** + * Sets hosts for a given fragment + * @param hosts hosts which have given fragment + */ + public void setHosts(String[] hosts) { + this.hosts = hosts; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java index 9816fdc..959cda6 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java @@ -52,6 +52,8 @@ public class InputData { protected String remoteLogin; protected String remoteSecret; protected int dataFragment; /* should be deprecated */ + private EnumAggregationType aggType; + private int fragmentIndex; /** * When false the bridge has to run in synchronized mode. default value - @@ -335,4 +337,36 @@ public class InputData { return dataFragment; } + /** + * Returns aggregate type, i.e - count, min, max, etc + * @return aggregate type + */ + public EnumAggregationType getAggType() { + return aggType; + } + + /** + * Sets aggregate type, one of @see EnumAggregationType value + * @param aggType aggregate type + */ + public void setAggType(EnumAggregationType aggType) { + this.aggType = aggType; + } + + /** + * Returns index of a fragment in a file + * @return index of a fragment + */ + public int getFragmentIndex() { + return fragmentIndex; + } + + /** + * Sets index of a fragment in a file + * @param fragmentIndex index of a fragment + */ + public void setFragmentIndex(int fragmentIndex) { + this.fragmentIndex = fragmentIndex; + } + } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java index 51326bc..29d9c52 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java @@ -20,9 +20,15 @@ package org.apache.hawq.pxf.api.utilities; */ import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.StatsAccessor; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -151,4 +157,78 @@ public class Utilities { } return input.replaceAll("[^a-zA-Z0-9_:/-]", "."); } + + /** + * Parses input data and returns fragment metadata. + * + * @param inputData input data which has protocol information + * @return fragment metadata + * @throws IllegalArgumentException if fragment metadata information wasn't found in input data + * @throws Exception + */ + public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception { + byte[] serializedLocation = inputData.getFragmentMetadata(); + if (serializedLocation == null) { + throw new IllegalArgumentException("Missing fragment location information"); + } + try (ObjectInputStream objectStream = new ObjectInputStream(new ByteArrayInputStream(serializedLocation))) { + long start = objectStream.readLong(); + long end = objectStream.readLong(); + String[] hosts = (String[]) objectStream.readObject(); + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("parsed file split: path "); + sb.append(inputData.getDataSource()); + sb.append(", start "); + sb.append(start); + sb.append(", end "); + sb.append(end); + sb.append(", hosts "); + sb.append(ArrayUtils.toString(hosts)); + LOG.debug(sb.toString()); + } + FragmentMetadata fragmentMetadata = new FragmentMetadata(start, end, hosts); + return fragmentMetadata; + } catch (Exception e) { + LOG.error("Unable to parse fragment metadata"); + throw e; + } + } + + /** + * Based on accessor information determines whether to use AggBridge + * + * @param protData + * @return true if AggBridge is applicable for current context + */ + public static boolean useAggBridge(InputData inputData) { + boolean isStatsAccessor = false; + try { + isStatsAccessor = ArrayUtils.contains(Class.forName(inputData.getAccessor()).getInterfaces(), StatsAccessor.class); + } catch (ClassNotFoundException e) { + LOG.error("Unable to load accessor class: " + e.getMessage()); + return false; + } + return (inputData != null) && (inputData.getAggType() != null) + && inputData.getAggType().isOptimizationSupported() + && isStatsAccessor; + } + + /** + * Determines whether accessor should use statistics to optimize reading results + * + * @param accessor accessor instance + * @param inputData input data which has protocol information + * @return true if this accessor should use statistic information + */ + public static boolean useStats(ReadAccessor accessor, InputData inputData) { + if (accessor instanceof StatsAccessor) { + if (inputData != null && !inputData.hasFilter() + && inputData.getAggType() != null + && inputData.getAggType().isOptimizationSupported()) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java index 355ea42..6fe896a 100644 --- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java +++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java @@ -22,10 +22,19 @@ package org.apache.hawq.pxf.api.utilities; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; + +import org.apache.hawq.pxf.api.Metadata; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.StatsAccessor; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Utilities; import org.junit.Test; @@ -37,6 +46,49 @@ import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) @PrepareForTest({Class.class}) public class UtilitiesTest { + class StatsAccessorImpl implements StatsAccessor { + + @Override + public boolean openForRead() throws Exception { + return false; + } + + @Override + public OneRow readNextObject() throws Exception { + return null; + } + + @Override + public void closeForRead() throws Exception { + } + + @Override + public void retrieveStats() throws Exception { + } + + @Override + public OneRow emitAggObject() { + return null; + } + } + + class NonStatsAccessorImpl implements ReadAccessor { + + @Override + public boolean openForRead() throws Exception { + return false; + } + + @Override + public OneRow readNextObject() throws Exception { + return null; + } + + @Override + public void closeForRead() throws Exception { + } + } + @Test public void byteArrayToOctalStringNull() throws Exception { StringBuilder sb = null; @@ -114,4 +166,43 @@ public class UtilitiesTest { result = Utilities.maskNonPrintables(input); assertEquals("http://www.beatles.com/info.query.whoisthebest", result); } + + @Test + public void parseFragmentMetadata() throws Exception { + InputData metaData = mock(InputData.class); + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + ObjectOutputStream os = new ObjectOutputStream(bas); + os.writeLong(10); + os.writeLong(100); + os.writeObject(new String[] { "hostname" }); + os.close(); + when(metaData.getFragmentMetadata()).thenReturn(bas.toByteArray()); + FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(metaData); + + assertEquals(10, fragmentMetadata.getStart()); + assertEquals(100, fragmentMetadata.getEnd()); + assertEquals(new String[] { "hostname" }, fragmentMetadata.getHosts()); + } + + @Test + public void useAggBridge() { + InputData metaData = mock(InputData.class); + when(metaData.getAccessor()).thenReturn(StatsAccessorImpl.class.getName()); + when(metaData.getAggType()).thenReturn(EnumAggregationType.COUNT); + assertTrue(Utilities.useAggBridge(metaData)); + + when(metaData.getAccessor()).thenReturn(UtilitiesTest.class.getName()); + when(metaData.getAggType()).thenReturn(EnumAggregationType.COUNT); + assertFalse(Utilities.useAggBridge(metaData)); + } + + @Test + public void useStats() { + InputData metaData = mock(InputData.class); + ReadAccessor accessor = new StatsAccessorImpl(); + when(metaData.getAggType()).thenReturn(EnumAggregationType.COUNT); + assertTrue(Utilities.useStats(accessor, metaData)); + ReadAccessor nonStatusAccessor = new NonStatsAccessorImpl(); + assertFalse(Utilities.useStats(nonStatusAccessor, metaData)); + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java index 178b774..a95248e 100644 --- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java +++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java @@ -65,7 +65,7 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files conf = new Configuration(); - fileSplit = HdfsUtilities.parseFragmentMetadata(inputData); + fileSplit = HdfsUtilities.parseFileSplit(inputData); } /** http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java index 0174bd8..b61d76a 100644 --- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java +++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java @@ -75,7 +75,7 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements @Override public boolean openForRead() throws Exception { LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>(); - FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData); + FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData); requestSplits.add(fileSplit); // Initialize record reader based on current split http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java index c99ccd6..1aae838 100644 --- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java +++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java @@ -22,6 +22,7 @@ package org.apache.hawq.pxf.plugins.hdfs.utilities; import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.utilities.FragmentMetadata; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.avro.Schema; @@ -30,7 +31,6 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.mapred.FsInput; -import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -171,29 +171,11 @@ public class HdfsUtilities { * @param inputData request input data * @return FileSplit with fragment metadata */ - public static FileSplit parseFragmentMetadata(InputData inputData) { + public static FileSplit parseFileSplit(InputData inputData) { try { - byte[] serializedLocation = inputData.getFragmentMetadata(); - if (serializedLocation == null) { - throw new IllegalArgumentException( - "Missing fragment location information"); - } - - ByteArrayInputStream bytesStream = new ByteArrayInputStream( - serializedLocation); - ObjectInputStream objectStream = new ObjectInputStream(bytesStream); - - long start = objectStream.readLong(); - long end = objectStream.readLong(); - - String[] hosts = (String[]) objectStream.readObject(); - - FileSplit fileSplit = new FileSplit(new Path( - inputData.getDataSource()), start, end, hosts); + FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData); - LOG.debug("parsed file split: path " + inputData.getDataSource() - + ", start " + start + ", end " + end + ", hosts " - + ArrayUtils.toString(hosts)); + FileSplit fileSplit = new FileSplit(new Path(inputData.getDataSource()), fragmentMetadata.getStart(), fragmentMetadata.getEnd(), fragmentMetadata.getHosts()); return fileSplit; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java index 36ca846..9b1fc6d 100644 --- a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java +++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java @@ -21,10 +21,12 @@ package org.apache.hawq.pxf.plugins.hdfs.utilities; import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.util.ReflectionUtils; import org.junit.Before; import org.junit.Test; @@ -35,6 +37,8 @@ import org.powermock.core.classloader.annotations.SuppressStaticInitializationFo import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -198,4 +202,21 @@ public class HdfsUtilitiesTest { assertEquals("", HdfsUtilities.toString(Collections.<OneField>emptyList(), "!")); } + + @Test + public void testParseFileSplit() throws Exception { + InputData inputData = mock(InputData.class); + when(inputData.getDataSource()).thenReturn("/abc/path/to/data/source"); + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + ObjectOutputStream os = new ObjectOutputStream(bas); + os.writeLong(10); + os.writeLong(100); + os.writeObject(new String[] { "hostname" }); + os.close(); + when(inputData.getFragmentMetadata()).thenReturn(bas.toByteArray()); + FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData); + assertEquals(fileSplit.getStart(), 10); + assertEquals(fileSplit.getLength(), 100); + assertEquals(fileSplit.getPath().toString(), "/abc/path/to/data/source"); + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java index 07348b0..5f7c584 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java @@ -23,16 +23,24 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hawq.pxf.api.BasicFilter; import org.apache.hawq.pxf.api.LogicalFilter; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.StatsAccessor; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.EnumAggregationType; +import org.apache.hawq.pxf.api.utilities.FragmentMetadata; import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.mapred.*; +import java.io.IOException; import java.sql.Date; import java.util.ArrayList; import java.util.Arrays; @@ -45,7 +53,7 @@ import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_ * This class replaces the generic HiveAccessor for a case where a table is stored entirely as ORC files. * Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver} */ -public class HiveORCAccessor extends HiveAccessor { +public class HiveORCAccessor extends HiveAccessor implements StatsAccessor { private static final Log LOG = LogFactory.getLog(HiveORCAccessor.class); @@ -53,6 +61,14 @@ public class HiveORCAccessor extends HiveAccessor { private final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns"; private final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names"; private final String SARG_PUSHDOWN = "sarg.pushdown"; + protected Reader orcReader; + + private boolean useStats; + private long count; + private long objectsEmitted; + private OneRow rowToEmitCount; + + private boolean statsInitialized; /** * Constructs a HiveORCFileAccessor. @@ -65,12 +81,21 @@ public class HiveORCAccessor extends HiveAccessor { HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE); initPartitionFields(hiveUserData.getPartitionKeys()); filterInFragmenter = hiveUserData.isFilterInFragmenter(); + useStats = Utilities.useStats(this, inputData); } @Override public boolean openForRead() throws Exception { - addColumns(); - addFilters(); + if (useStats) { + orcReader = HiveUtilities.getOrcReader(inputData); + if (orcReader == null) { + return false; + } + objectsEmitted = 0; + } else { + addColumns(); + addFilters(); + } return super.openForRead(); } @@ -213,4 +238,50 @@ public class HiveORCAccessor extends HiveAccessor { return true; } + /** + * Fetches file-level statistics from an ORC file. + */ + @Override + public void retrieveStats() throws Exception { + if (!this.useStats) { + throw new IllegalStateException("Accessor is not using statistics in current context."); + } + /* + * We are using file-level stats therefore if file has multiple splits, + * it's enough to return count for a first split in file. + * In case file has multiple splits - we don't want to duplicate counts. + */ + if (inputData.getFragmentIndex() == 0) { + this.count = this.orcReader.getNumberOfRows(); + rowToEmitCount = readNextObject(); + } + statsInitialized = true; + + } + + /** + * Emits tuple without reading from disk, currently supports COUNT + */ + @Override + public OneRow emitAggObject() { + if(!statsInitialized) { + throw new IllegalStateException("retrieveStats() should be called before calling emitAggObject()"); + } + OneRow row = null; + if (inputData.getAggType() == null) + throw new UnsupportedOperationException("Aggregate opration is required"); + switch (inputData.getAggType()) { + case COUNT: + if (objectsEmitted < count) { + objectsEmitted++; + row = rowToEmitCount; + } + break; + default: { + throw new UnsupportedOperationException("Aggregation operation is not supported."); + } + } + return row; + } + } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java index 3328c9f..808c415 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java @@ -30,6 +30,8 @@ import java.util.Properties; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; @@ -39,6 +41,11 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.*; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hawq.pxf.api.Fragmenter; import org.apache.hawq.pxf.api.Metadata; import org.apache.hawq.pxf.api.Metadata.Field; @@ -48,12 +55,9 @@ import org.apache.hawq.pxf.api.utilities.EnumHawqType; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.hawq.pxf.api.io.DataType; -import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hawq.pxf.plugins.hive.HiveDataFragmenter; import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter; import org.apache.hawq.pxf.plugins.hive.HiveTablePartition; -import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; -import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS; import org.apache.hawq.pxf.plugins.hive.HiveUserData; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; @@ -627,4 +631,21 @@ public class HiveUtilities { return deserializer; } + + /** + * Creates ORC file reader. + * @param inputData input data with given data source + * @return ORC file reader + */ + public static Reader getOrcReader(InputData inputData) { + try { + Path path = new Path(inputData.getDataSource()); + Reader reader = OrcFile.createReader(path.getFileSystem(new Configuration()), path); + + return reader; + + } catch (Exception e) { + throw new RuntimeException("Exception while getting orc reader", e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java index 8b4bf13..daee331 100644 --- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java +++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java @@ -20,10 +20,14 @@ package org.apache.hawq.pxf.plugins.hive; */ import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.mapred.*; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.EnumAggregationType; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; @@ -39,6 +43,8 @@ import org.powermock.modules.junit4.PowerMockRunner; import static org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.SARG_PUSHDOWN; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -55,6 +61,7 @@ public class HiveORCAccessorTest { @Mock OrcInputFormat orcInputFormat; @Mock InputFormat inputFormat; @Mock ColumnDescriptor columnDesc; + @Mock Reader orcReader; JobConf jobConf; HiveORCAccessor accessor; @@ -65,6 +72,7 @@ public class HiveORCAccessorTest { PowerMockito.mockStatic(HiveUtilities.class); PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true, "1", "")); + PowerMockito.when(HiveUtilities.getOrcReader(any(InputData.class))).thenReturn(orcReader); PowerMockito.mockStatic(HdfsUtilities.class); @@ -121,4 +129,9 @@ public class HiveORCAccessorTest { assertEquals(sarg.toKryo(), jobConf.get(SARG_PUSHDOWN)); } + @Test(expected=IllegalStateException.class) + public void emitAggObjectCountStatsNotInitialized() { + accessor.emitAggObject(); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java new file mode 100644 index 0000000..c03a6a2 --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hawq.pxf.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hawq.pxf.api.BadRecordException; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.StatsAccessor; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.collections.map.LRUMap; + +/** + * Bridge class optimized for aggregate queries. + * + */ +public class AggBridge extends ReadBridge implements Bridge { + private static final Log LOG = LogFactory.getLog(AggBridge.class); + /* Avoid resolving rows with the same key twice */ + private LRUMap outputCache; + + public AggBridge(ProtocolData protData) throws Exception { + super(protData); + } + + @Override + public boolean beginIteration() throws Exception { + /* Initialize LRU cache with 100 items*/ + outputCache = new LRUMap(); + boolean openForReadStatus = super.fileAccessor.openForRead(); + ((StatsAccessor) fileAccessor).retrieveStats(); + return openForReadStatus; + } + + @Override + @SuppressWarnings("unchecked") + public Writable getNext() throws Exception { + Writable output = null; + LinkedList<Writable> cachedOutput = null; + OneRow onerow = null; + + if (!outputQueue.isEmpty()) { + return outputQueue.pop(); + } + + try { + while (outputQueue.isEmpty()) { + onerow = ((StatsAccessor) fileAccessor).emitAggObject(); + if (onerow == null) { + break; + } + cachedOutput = (LinkedList<Writable>) outputCache.get(onerow.getKey()); + if (cachedOutput == null) { + cachedOutput = outputBuilder.makeOutput(fieldsResolver.getFields(onerow)); + outputCache.put(onerow.getKey(), cachedOutput); + } + outputQueue.addAll(cachedOutput); + if (!outputQueue.isEmpty()) { + output = outputQueue.pop(); + break; + } + } + } catch (Exception ex) { + LOG.error("Error occurred when reading next object from aggregate bridge:" + ex.getMessage()); + throw ex; + } + + return output; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java index 104f353..4294e09 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java @@ -39,7 +39,8 @@ import javax.ws.rs.core.StreamingOutput; import org.apache.catalina.connector.ClientAbortException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - +import org.apache.hawq.pxf.api.utilities.Utilities; +import org.apache.hawq.pxf.service.AggBridge; import org.apache.hawq.pxf.service.Bridge; import org.apache.hawq.pxf.service.ReadBridge; import org.apache.hawq.pxf.service.ReadSamplingBridge; @@ -98,6 +99,8 @@ public class BridgeResource extends RestResource { float sampleRatio = protData.getStatsSampleRatio(); if (sampleRatio > 0) { bridge = new ReadSamplingBridge(protData); + } else if (Utilities.useAggBridge(protData)) { + bridge = new AggBridge(protData); } else { bridge = new ReadBridge(protData); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java index dc2a110..0cb6d47 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hawq.pxf.api.OutputFormat; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.EnumAggregationType; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.ProfilesConf; @@ -115,6 +116,18 @@ public class ProtocolData extends InputData { // Store alignment for global use as a system property System.setProperty("greenplum.alignment", getProperty("ALIGNMENT")); + + //Get aggregation operation + String aggTypeOperationName = getOptionalProperty("AGG-TYPE"); + + this.setAggType(EnumAggregationType.getAggregationType(aggTypeOperationName)); + + //Get fragment index + String fragmentIndexStr = getOptionalProperty("FRAGMENT-INDEX"); + + if (fragmentIndexStr != null) { + this.setFragmentIndex(Integer.parseInt(fragmentIndexStr)); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5ffddf2/src/bin/gpfusion/gpbridgeapi.c ---------------------------------------------------------------------- diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c index f176586..a853f06 100644 --- a/src/bin/gpfusion/gpbridgeapi.c +++ b/src/bin/gpfusion/gpbridgeapi.c @@ -220,6 +220,7 @@ void set_current_fragment_headers(gphadoop_context* context) churl_headers_override(context->churl_headers, "X-GP-DATA-DIR", frag_data->source_name); churl_headers_override(context->churl_headers, "X-GP-DATA-FRAGMENT", frag_data->index); churl_headers_override(context->churl_headers, "X-GP-FRAGMENT-METADATA", frag_data->fragment_md); + churl_headers_override(context->churl_headers, "X-GP-FRAGMENT-INDEX", frag_data->index); if (frag_data->user_data) {