Author: olga Date: Thu Aug 14 16:20:00 2008 New Revision: 686085 URL: http://svn.apache.org/viewvc?rev=686085&view=rev Log: PIG-301: order by descending
Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=686085&r1=686084&r2=686085&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Aug 14 16:20:00 2008 @@ -289,8 +289,12 @@ if(mro.UDFs.size()==1){ String compFuncSpec = mro.UDFs.get(0); Class comparator = PigContext.resolveClassName(compFuncSpec); - if(ComparisonFunc.class.isAssignableFrom(comparator)) + if(ComparisonFunc.class.isAssignableFrom(comparator)) { jobConf.setOutputKeyComparatorClass(comparator); + } + } else { + jobConf.set("pig.sortOrder", + ObjectSerializer.serialize(mro.getSortOrder())); } } @@ -385,7 +389,44 @@ if (succ.isGlobalSort()) involved = true; } } - if (!involved) { + if (involved) { + switch (keyType) { + case DataType.INTEGER: + jobConf.setOutputKeyComparatorClass(PigIntRawComparator.class); + break; + + case DataType.LONG: + jobConf.setOutputKeyComparatorClass(PigLongRawComparator.class); + break; + + case DataType.FLOAT: + jobConf.setOutputKeyComparatorClass(PigFloatRawComparator.class); + break; + + case DataType.DOUBLE: + jobConf.setOutputKeyComparatorClass(PigDoubleRawComparator.class); + break; + + case DataType.CHARARRAY: + jobConf.setOutputKeyComparatorClass(PigTextRawComparator.class); + break; + + case DataType.BYTEARRAY: + jobConf.setOutputKeyComparatorClass(PigBytesRawComparator.class); + break; + + case DataType.MAP: + log.error("Using Map as key not supported."); + throw new JobCreationException("Using Map as key not supported"); + + case DataType.TUPLE: + jobConf.setOutputKeyComparatorClass(PigTupleRawComparator.class); + break; + + default: + break; + } + } else { switch (keyType) { case DataType.INTEGER: jobConf.setOutputKeyComparatorClass(PigIntWritableComparator.class); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=686085&r1=686084&r2=686085&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Aug 14 16:20:00 2008 @@ -825,8 +825,7 @@ curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString()); } }catch(Exception e){ - VisitorException pe = new VisitorException(e.getMessage()); - pe.initCause(e); + VisitorException pe = new VisitorException(e.getMessage(), e); throw pe; } } @@ -864,6 +863,17 @@ byte keyType = DataType.UNKNOWN; + boolean[] sortOrder; + + List<Boolean> sortOrderList = sort.getMAscCols(); + if(sortOrderList != null) { + sortOrder = new boolean[sortOrderList.size()]; + for(int i = 0; i < sortOrderList.size(); ++i) { + sortOrder[i] = sortOrderList.get(i); + } + mro.setSortOrder(sortOrder); + } + if (fields == null) { // This is project * PhysicalPlan ep = new PhysicalPlan(); @@ -1004,8 +1014,9 @@ POSort sort = new POSort(inpSort.getOperatorKey(), inpSort .getRequestedParallelism(), null, inpSort.getSortPlans(), inpSort.getMAscCols(), inpSort.getMSortFunc()); - if(sort.isUDFComparatorUsed) + if(sort.isUDFComparatorUsed) { mro.UDFs.add(sort.getMSortFunc().getFuncSpec().toString()); + } List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>(); List<Boolean> flat1 = new ArrayList<Boolean>(); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=686085&r1=686084&r2=686085&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Aug 14 16:20:00 2008 @@ -72,6 +72,10 @@ //The quantiles file name if globalSort is true String quantFile; + //The sort order of the columns; + //asc is true and desc is false + boolean[] sortOrder; + public List<String> UDFs; NodeIdGenerator nig; @@ -214,4 +218,16 @@ public void setQuantFile(String quantFile) { this.quantFile = quantFile; } + + public void setSortOrder(boolean[] sortOrder) { + if(null == sortOrder) return; + this.sortOrder = new boolean[sortOrder.length]; + for(int i = 0; i < sortOrder.length; ++i) { + this.sortOrder[i] = sortOrder[i]; + } + } + + public boolean[] getSortOrder() { + return sortOrder; + } } Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=686085&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Thu Aug 14 16:20:00 2008 @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.pig.impl.io.NullableBytesWritable; +import org.apache.pig.impl.util.ObjectSerializer; + +public class PigBytesRawComparator extends BytesWritable.Comparator implements Configurable { + + private final Log mLog = LogFactory.getLog(getClass()); + private boolean[] mAsc; + + public void setConf(Configuration conf) { + if (!(conf instanceof JobConf)) { + mLog.warn("Expected jobconf in setConf, got " + + conf.getClass().getName()); + return; + } + JobConf jconf = (JobConf)conf; + try { + mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get( + "pig.sortOrder")); + } catch (IOException ioe) { + mLog.error("Unable to deserialize pig.sortOrder " + + ioe.getMessage()); + throw new RuntimeException(ioe); + } + if (mAsc == null) { + mAsc = new boolean[1]; + mAsc[0] = true; + } + } + + public Configuration getConf() { + return null; + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int rc = 0; + // If either are null, handle differently. + if (b1[s1] == NullableBytesWritable.NOTNULL && + b2[s2] == NullableBytesWritable.NOTNULL) { + rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + } else { + // For sorting purposes two nulls are equal. + if (b1[s1] == NullableBytesWritable.NULL && + b2[s2] == NullableBytesWritable.NULL) rc = 0; + else if (b1[s1] == NullableBytesWritable.NULL) rc = -1; + else rc = 1; + } + if (!mAsc[0]) rc *= -1; + return rc; + } + + +} Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=686085&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java Thu Aug 14 16:20:00 2008 @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.pig.backend.hadoop.DoubleWritable; +import org.apache.pig.impl.io.NullableDoubleWritable; +import org.apache.pig.impl.util.ObjectSerializer; + +public class PigDoubleRawComparator extends DoubleWritable.Comparator implements Configurable { + + private final Log mLog = LogFactory.getLog(getClass()); + private boolean[] mAsc; + + public void setConf(Configuration conf) { + if (!(conf instanceof JobConf)) { + mLog.warn("Expected jobconf in setConf, got " + + conf.getClass().getName()); + return; + } + JobConf jconf = (JobConf)conf; + try { + mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get( + "pig.sortOrder")); + } catch (IOException ioe) { + mLog.error("Unable to deserialize pig.sortOrder " + + ioe.getMessage()); + throw new RuntimeException(ioe); + } + if (mAsc == null) { + mAsc = new boolean[1]; + mAsc[0] = true; + } + } + + public Configuration getConf() { + return null; + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int rc = 0; + // If either are null, handle differently. + if (b1[s1] == NullableDoubleWritable.NOTNULL && + b2[s2] == NullableDoubleWritable.NOTNULL) { + rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + } else { + // For sorting purposes two nulls are equal. + if (b1[s1] == NullableDoubleWritable.NULL && + b2[s2] == NullableDoubleWritable.NULL) rc = 0; + else if (b1[s1] == NullableDoubleWritable.NULL) rc = -1; + else rc = 1; + } + if (!mAsc[0]) rc *= -1; + return rc; + } + + +} Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=686085&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java Thu Aug 14 16:20:00 2008 @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.pig.impl.io.NullableFloatWritable; +import org.apache.pig.impl.util.ObjectSerializer; + +public class PigFloatRawComparator extends FloatWritable.Comparator implements Configurable { + + private final Log mLog = LogFactory.getLog(getClass()); + private boolean[] mAsc; + + public void setConf(Configuration conf) { + if (!(conf instanceof JobConf)) { + mLog.warn("Expected jobconf in setConf, got " + + conf.getClass().getName()); + return; + } + JobConf jconf = (JobConf)conf; + try { + mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get( + "pig.sortOrder")); + } catch (IOException ioe) { + mLog.error("Unable to deserialize pig.sortOrder " + + ioe.getMessage()); + throw new RuntimeException(ioe); + } + if (mAsc == null) { + mAsc = new boolean[1]; + mAsc[0] = true; + } + } + + public Configuration getConf() { + return null; + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int rc = 0; + // If either are null, handle differently. + if (b1[s1] == NullableFloatWritable.NOTNULL && + b2[s2] == NullableFloatWritable.NOTNULL) { + rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + } else { + // For sorting purposes two nulls are equal. + if (b1[s1] == NullableFloatWritable.NULL && + b2[s2] == NullableFloatWritable.NULL) rc = 0; + else if (b1[s1] == NullableFloatWritable.NULL) rc = -1; + else rc = 1; + } + if (!mAsc[0]) rc *= -1; + return rc; + } + + +} Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=686085&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java Thu Aug 14 16:20:00 2008 @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.pig.impl.io.NullableIntWritable; +import org.apache.pig.impl.util.ObjectSerializer; + +public class PigIntRawComparator extends IntWritable.Comparator implements Configurable { + + private final Log mLog = LogFactory.getLog(getClass()); + private boolean[] mAsc; + + public void setConf(Configuration conf) { + if (!(conf instanceof JobConf)) { + mLog.warn("Expected jobconf in setConf, got " + + conf.getClass().getName()); + return; + } + JobConf jconf = (JobConf)conf; + try { + mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get( + "pig.sortOrder")); + } catch (IOException ioe) { + mLog.error("Unable to deserialize pig.sortOrder " + + ioe.getMessage()); + throw new RuntimeException(ioe); + } + if (mAsc == null) { + mAsc = new boolean[1]; + mAsc[0] = true; + } + } + + public Configuration getConf() { + return null; + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int rc = 0; + // If either are null, handle differently. + if (b1[s1] == NullableIntWritable.NOTNULL && + b2[s2] == NullableIntWritable.NOTNULL) { + rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + } else { + // For sorting purposes two nulls are equal. + if (b1[s1] == NullableIntWritable.NULL && + b2[s2] == NullableIntWritable.NULL) rc = 0; + else if (b1[s1] == NullableIntWritable.NULL) rc = -1; + else rc = 1; + } + if (!mAsc[0]) rc *= -1; + return rc; + } + + +} Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=686085&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java Thu Aug 14 16:20:00 2008 @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.pig.impl.io.NullableLongWritable; +import org.apache.pig.impl.util.ObjectSerializer; + +public class PigLongRawComparator extends LongWritable.Comparator implements Configurable { + + private final Log mLog = LogFactory.getLog(getClass()); + private boolean[] mAsc; + + public void setConf(Configuration conf) { + if (!(conf instanceof JobConf)) { + mLog.warn("Expected jobconf in setConf, got " + + conf.getClass().getName()); + return; + } + JobConf jconf = (JobConf)conf; + try { + mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get( + "pig.sortOrder")); + } catch (IOException ioe) { + mLog.error("Unable to deserialize pig.sortOrder " + + ioe.getMessage()); + throw new RuntimeException(ioe); + } + if (mAsc == null) { + mAsc = new boolean[1]; + mAsc[0] = true; + } + } + + public Configuration getConf() { + return null; + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int rc = 0; + // If either are null, handle differently. + if (b1[s1] == NullableLongWritable.NOTNULL && + b2[s2] == NullableLongWritable.NOTNULL) { + rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + } else { + // For sorting purposes two nulls are equal. + if (b1[s1] == NullableLongWritable.NULL && + b2[s2] == NullableLongWritable.NULL) rc = 0; + else if (b1[s1] == NullableLongWritable.NULL) rc = -1; + else rc = 1; + } + if (!mAsc[0]) rc *= -1; + return rc; + } + + +} Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=686085&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Thu Aug 14 16:20:00 2008 @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.pig.impl.io.NullableText; +import org.apache.pig.impl.util.ObjectSerializer; + +public class PigTextRawComparator extends Text.Comparator implements Configurable { + + private final Log mLog = LogFactory.getLog(getClass()); + private boolean[] mAsc; + + public void setConf(Configuration conf) { + if (!(conf instanceof JobConf)) { + mLog.warn("Expected jobconf in setConf, got " + + conf.getClass().getName()); + return; + } + JobConf jconf = (JobConf)conf; + try { + mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get( + "pig.sortOrder")); + } catch (IOException ioe) { + mLog.error("Unable to deserialize pig.sortOrder " + + ioe.getMessage()); + throw new RuntimeException(ioe); + } + if (mAsc == null) { + mAsc = new boolean[1]; + mAsc[0] = true; + } + } + + public Configuration getConf() { + return null; + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int rc = 0; + // If either are null, handle differently. + if (b1[s1] == NullableText.NOTNULL && + b2[s2] == NullableText.NOTNULL) { + rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2); + } else { + // For sorting purposes two nulls are equal. + if (b1[s1] == NullableText.NULL && + b2[s2] == NullableText.NULL) rc = 0; + else if (b1[s1] == NullableText.NULL) rc = -1; + else rc = 1; + } + if (!mAsc[0]) rc *= -1; + return rc; + } + + +} Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java?rev=686085&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java Thu Aug 14 16:20:00 2008 @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.ObjectSerializer; + +public class PigTupleRawComparator extends WritableComparator implements Configurable { + + private final Log mLog = LogFactory.getLog(getClass()); + private boolean[] mAsc; + private boolean mWholeTuple; + private TupleFactory mFact; + + public PigTupleRawComparator() { + super(TupleFactory.getInstance().tupleClass()); + } + + public void setConf(Configuration conf) { + if (!(conf instanceof JobConf)) { + mLog.warn("Expected jobconf in setConf, got " + + conf.getClass().getName()); + return; + } + JobConf jconf = (JobConf)conf; + try { + mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get( + "pig.sortOrder")); + } catch (IOException ioe) { + mLog.error("Unable to deserialize pig.sortOrder " + + ioe.getMessage()); + throw new RuntimeException(ioe); + } + if (mAsc == null) { + mAsc = new boolean[1]; + mAsc[0] = true; + } + // If there's only one entry in mAsc, it means it's for the whole + // tuple. So we can't be looking for each column. + mWholeTuple = (mAsc.length == 1); + mFact = TupleFactory.getInstance(); + } + + public Configuration getConf() { + return null; + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + // This can't be done on the raw data. Users are allowed to + // implement their own versions of tuples, which means we have no + // idea what the underlying representation is. So step one is to + // instantiate each object as a tuple. + Tuple t1 = mFact.newTuple(); + Tuple t2 = mFact.newTuple(); + try { + t1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); + t2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); + } catch (IOException ioe) { + mLog.error("Unable to instantiate tuples for comparison: " + + ioe.getMessage()); + throw new RuntimeException(ioe.getMessage(), ioe); + } + + int rc; + if (t1.isNull() || t2.isNull()) { + // For sorting purposes two nulls are equal. + if (t1.isNull() && t2.isNull()) rc = 0; + else if (t1.isNull()) rc = -1; + else rc = 1; + } else { + int sz1 = t1.size(); + int sz2 = t2.size(); + if (sz2 < sz1) { + rc = 1; + } else if (sz2 > sz1) { + rc = -1; + } else { + for (int i = 0; i < sz1; i++) { + try { + int c = DataType.compare(t1.get(i), t2.get(i)); + if (c != 0) { + if (!mWholeTuple && !mAsc[i]) c *= -1; + else if (mWholeTuple && !mAsc[0]) c *= -1; + return c; + } + } catch (ExecException e) { + throw new RuntimeException("Unable to compare tuples", e); + } + } + rc = 0; + } + } + if (mWholeTuple && !mAsc[0]) rc *= -1; + return rc; + } + + +} Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java?rev=686085&r1=686084&r2=686085&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java Thu Aug 14 16:20:00 2008 @@ -1,5 +1,19 @@ -/** - * +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.pig.impl.io; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java?rev=686085&r1=686084&r2=686085&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java Thu Aug 14 16:20:00 2008 @@ -1,5 +1,19 @@ -/** - * +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.pig.impl.io; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java?rev=686085&r1=686084&r2=686085&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java Thu Aug 14 16:20:00 2008 @@ -1,5 +1,19 @@ -/** - * +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.pig.impl.io; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java?rev=686085&r1=686084&r2=686085&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java Thu Aug 14 16:20:00 2008 @@ -1,5 +1,19 @@ -/** - * +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.pig.impl.io; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java?rev=686085&r1=686084&r2=686085&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java Thu Aug 14 16:20:00 2008 @@ -1,5 +1,19 @@ -/** - * +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.pig.impl.io; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java?rev=686085&r1=686084&r2=686085&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java Thu Aug 14 16:20:00 2008 @@ -1,5 +1,19 @@ -/** - * +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.pig.impl.io;