This is an automated email from the ASF dual-hosted git repository. chinmayskulkarni pushed a commit to branch 4.x-HBase-1.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push: new 5304b56 PHOENIX-5131 Make spilling to disk for order/group by configurable 5304b56 is described below commit 5304b568cc91be5de1d04c7dd550fddfff77a4ab Author: Abhishek Singh Chouhan <abhishekchouhan...@gmail.com> AuthorDate: Thu Mar 14 11:49:05 2019 -0700 PHOENIX-5131 Make spilling to disk for order/group by configurable Signed-off-by: Chinmay Kulkarni <chinmayskulka...@apache.org> --- .../java/org/apache/phoenix/end2end/OrderByIT.java | 45 +++++++ ...OrderByWithServerClientSpoolingDisabledIT.java} | 17 ++- .../end2end/OrderByWithServerMemoryLimitIT.java | 81 ++++++++++++ .../phoenix/end2end/OrderByWithSpillingIT.java | 3 +- .../phoenix/end2end/SpooledTmpFileDeleteIT.java | 2 +- .../end2end/join/SortMergeJoinNoSpoolingIT.java | 83 ++++++++++++ .../phoenix/coprocessor/MetaDataProtocol.java | 7 ++ .../phoenix/coprocessor/ScanRegionObserver.java | 4 +- .../org/apache/phoenix/execute/AggregatePlan.java | 28 ++++- .../phoenix/execute/ClientAggregatePlan.java | 30 ++++- .../org/apache/phoenix/execute/ClientScanPlan.java | 16 ++- .../java/org/apache/phoenix/execute/ScanPlan.java | 10 +- .../apache/phoenix/execute/SortMergeJoinPlan.java | 139 +++++---------------- .../phoenix/hbase/index/util/VersionUtil.java | 12 ++ .../org/apache/phoenix/iterate/BufferedQueue.java | 20 +-- .../phoenix/iterate/BufferedSortedQueue.java | 36 ++---- .../apache/phoenix/iterate/BufferedTupleQueue.java | 134 ++++++++++++++++++++ .../iterate/NonAggregateRegionScannerFactory.java | 45 +++++-- .../iterate/OrderedAggregatingResultIterator.java | 5 +- .../phoenix/iterate/OrderedResultIterator.java | 71 +++++++++-- .../org/apache/phoenix/iterate/PhoenixQueues.java | 96 ++++++++++++++ .../apache/phoenix/iterate/SizeAwareQueue.java} | 19 +-- .../org/apache/phoenix/iterate/SizeBoundQueue.java | 96 ++++++++++++++ .../phoenix/iterate/SpoolingResultIterator.java | 5 +- .../org/apache/phoenix/query/QueryServices.java | 11 +- .../apache/phoenix/query/QueryServicesOptions.java | 19 ++- .../phoenix/iterate/OrderedResultIteratorTest.java | 55 +++++++- .../phoenix/query/QueryServicesTestImpl.java | 3 +- .../org/apache/phoenix/util/MetaDataUtilTest.java | 10 +- 29 files changed, 881 insertions(+), 221 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java index 792d08f..172ed89 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java @@ -20,7 +20,10 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.hamcrest.CoreMatchers.containsString; import java.sql.Connection; import java.sql.Date; @@ -30,6 +33,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Properties; +import org.apache.phoenix.exception.PhoenixIOException; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; @@ -461,4 +466,44 @@ public class OrderByIT extends BaseOrderByIT { conn.close(); } } + + @Test + public void testOrderByWithClientMemoryLimit() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); + props.put(QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB, + Boolean.toString(Boolean.FALSE)); + + try(Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + String tableName = generateUniqueName(); + String ddl = + "CREATE TABLE " + tableName + " (a_string varchar not null, col1 integer" + + " CONSTRAINT pk PRIMARY KEY (a_string))\n"; + createTestTable(getUrl(), ddl); + + String dml = "UPSERT INTO " + tableName + " VALUES(?, ?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + stmt.setString(1, "a"); + stmt.setInt(2, 40); + stmt.execute(); + stmt.setString(1, "b"); + stmt.setInt(2, 20); + stmt.execute(); + stmt.setString(1, "c"); + stmt.setInt(2, 30); + stmt.execute(); + conn.commit(); + + String query = "select count(*), col1 from " + tableName + " group by col1 order by 2"; + ResultSet rs = conn.createStatement().executeQuery(query); + try { + rs.next(); + fail("Expected PhoenixIOException due to IllegalStateException"); + } catch (PhoenixIOException e) { + assertThat(e.getMessage(), containsString("java.lang.IllegalStateException: " + + "Queue full. Consider increasing memory threshold or spooling to disk")); + } + } + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerClientSpoolingDisabledIT.java similarity index 66% copy from phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java copy to phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerClientSpoolingDisabledIT.java index c5eeaff..caf515f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerClientSpoolingDisabledIT.java @@ -19,18 +19,29 @@ package org.apache.phoenix.end2end; import java.util.Map; +import org.apache.phoenix.iterate.SizeBoundQueue; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.BeforeClass; import com.google.common.collect.Maps; -public class OrderByWithSpillingIT extends OrderByIT { +/** + * Same as the order by test but with spooling disabled both on the server and client. This will use + * {@link SizeBoundQueue} for all its operations + */ +public class OrderByWithServerClientSpoolingDisabledIT extends OrderByIT { + @BeforeClass public static void doSetup() throws Exception { Map<String, String> props = Maps.newHashMapWithExpectedSize(1); - // do lot's of spooling! - props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); + // make sure disabling server side spooling has no affect on correctness(existing orderby + // IT) + props.put(QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB, + Boolean.toString(Boolean.FALSE)); + props.put(QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB, + Boolean.toString(Boolean.FALSE)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerMemoryLimitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerMemoryLimitIT.java new file mode 100644 index 0000000..2c66614 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerMemoryLimitIT.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.exception.PhoenixIOException; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class OrderByWithServerMemoryLimitIT extends BaseTest { + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); + props.put(QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB, + Boolean.toString(Boolean.FALSE)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testOrderByWithServerMemoryLimit() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + String tableName = generateUniqueName(); + String ddl = + "CREATE TABLE " + tableName + " (a_string varchar not null, col1 integer" + + " CONSTRAINT pk PRIMARY KEY (a_string))\n"; + createTestTable(getUrl(), ddl); + + String dml = "UPSERT INTO " + tableName + " VALUES(?, ?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + stmt.setString(1, "a"); + stmt.setInt(2, 40); + stmt.execute(); + stmt.setString(1, "b"); + stmt.setInt(2, 20); + stmt.execute(); + stmt.setString(1, "c"); + stmt.setInt(2, 30); + stmt.execute(); + conn.commit(); + + String query = "select * from " + tableName + " order by 2"; + ResultSet rs = conn.createStatement().executeQuery(query); + try { + rs.next(); + fail("Expected PhoenixIOException due to IllegalStateException"); + } catch (PhoenixIOException e) { + assertThat(e.getMessage(), containsString("java.lang.IllegalStateException: " + + "Queue full. Consider increasing memory threshold or spooling to disk")); + } + } + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java index c5eeaff..80a5123 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java @@ -30,7 +30,8 @@ public class OrderByWithSpillingIT extends OrderByIT { public static void doSetup() throws Exception { Map<String, String> props = Maps.newHashMapWithExpectedSize(1); // do lot's of spooling! - props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); + props.put(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); + props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java index 9dc82bf..e63c3f6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java @@ -44,7 +44,7 @@ public class SpooledTmpFileDeleteIT extends ParallelStatsDisabledIT { private Connection getConnection() throws Exception { Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); props.setProperty(QueryServices.SPOOL_DIRECTORY, spoolDir.getPath()); - props.setProperty(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); + props.setProperty(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); props.setProperty(QueryServices.RENEW_LEASE_ENABLED, Boolean.toString(false)); // Ensures round robin off so that spooling is used. // TODO: review with Samarth - should a Noop iterator be used if pacing is not possible? diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinNoSpoolingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinNoSpoolingIT.java new file mode 100644 index 0000000..40b1fe6 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinNoSpoolingIT.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.end2end.join; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.exception.PhoenixIOException; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Maps; + +public class SortMergeJoinNoSpoolingIT extends SortMergeJoinNoIndexIT { + + public SortMergeJoinNoSpoolingIT(String[] indexDDL, String[] plans) { + super(indexDDL, plans); + } + + @Parameters(name = "SortMergeJoinNoSpoolingIT_{index}") // name is used by failsafe as file name + // in reports + public static Collection<Object> data() { + return SortMergeJoinNoIndexIT.data(); + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.CLIENT_JOIN_SPOOLING_ENABLED_ATTRIB, + Boolean.toString(Boolean.FALSE)); + props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, + Integer.toString(10 * 1000 * 1000)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testJoinWithMemoryLimit() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); + String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME); + String query = + "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + + tableName1 + " item JOIN " + tableName2 + + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\""; + + PreparedStatement statement = conn.prepareStatement(query); + ResultSet rs = statement.executeQuery(); + try { + rs.next(); + fail("Expected PhoenixIOException due to IllegalStateException"); + } catch (PhoenixIOException e) { + assertThat(e.getMessage(), containsString( + "Queue full. Consider increasing memory threshold or spooling to disk")); + } + + } + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 4160b8b..5b4db91 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -113,6 +113,13 @@ public abstract class MetaDataProtocol extends MetaDataService { // Version at which we allow SYSTEM.CATALOG to split public static final int MIN_SPLITTABLE_SYSTEM_CATALOG = VersionUtil.encodeVersion("4", "15", "0"); + // Version at and after which we will no longer expect client to serialize thresholdBytes for + // spooling into the scan + public static final int MIN_5_x_DISABLE_SERVER_SPOOL_THRESHOLD = + VersionUtil.encodeVersion("5", "1", "0"); + public static final int MIN_4_x_DISABLE_SERVER_SPOOL_THRESHOLD = + VersionUtil.encodeVersion("4", "15", "0"); + // ALWAYS update this map whenever rolling out a new release (major, minor or patch release). // Key is the SYSTEM.CATALOG timestamp for the version and value is the version string. private static final NavigableMap<Long, String> TIMESTAMP_VERSION_MAP = new TreeMap<>(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index c2dfc4c..d8c7634 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -46,11 +46,11 @@ import org.apache.phoenix.util.ScanUtil; */ public class ScanRegionObserver extends BaseScannerRegionObserver { - public static void serializeIntoScan(Scan scan, int thresholdBytes, int limit, List<OrderByExpression> orderByExpressions, int estimatedRowSize) { + public static void serializeIntoScan(Scan scan, int limit, + List<OrderByExpression> orderByExpressions, int estimatedRowSize) { ByteArrayOutputStream stream = new ByteArrayOutputStream(); // TODO: size? try { DataOutputStream output = new DataOutputStream(stream); - WritableUtils.writeVInt(output, thresholdBytes); WritableUtils.writeVInt(output, limit); WritableUtils.writeVInt(output, estimatedRowSize); WritableUtils.writeVInt(output, orderByExpressions.size()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 0c8e8dc..d7c3048 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -193,8 +193,16 @@ public class AggregatePlan extends BaseQueryPlan { isAscending=false; } OrderByExpression orderByExpression = new OrderByExpression(expression, isNullsLast, isAscending); - int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); - return new OrderedResultIterator(scanner, Collections.<OrderByExpression>singletonList(orderByExpression), threshold); + long threshold = + services.getProps().getLong(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES); + boolean spoolingEnabled = + services.getProps().getBoolean( + QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED); + return new OrderedResultIterator(scanner, + Collections.<OrderByExpression> singletonList(orderByExpression), + spoolingEnabled, threshold); } } @@ -306,10 +314,18 @@ public class AggregatePlan extends BaseQueryPlan { resultScanner = new LimitingResultIterator(resultScanner, limit); } } else { - int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( - QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); - resultScanner = new OrderedAggregatingResultIterator(aggResultIterator, orderBy.getOrderByExpressions(), - thresholdBytes, limit, offset); + long thresholdBytes = + context.getConnection().getQueryServices().getProps().getLong( + QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES); + boolean spoolingEnabled = + context.getConnection().getQueryServices().getProps().getBoolean( + QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED); + resultScanner = + new OrderedAggregatingResultIterator(aggResultIterator, + orderBy.getOrderByExpressions(), spoolingEnabled, thresholdBytes, limit, + offset); } if (context.getSequenceManager().getSequenceCount() > 0) { resultScanner = new SequenceResultIterator(resultScanner, context.getSequenceManager()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java index 676e8ad..2e0c374 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java @@ -146,8 +146,14 @@ public class ClientAggregatePlan extends ClientProcessingPlan { if (groupBy.isOrderPreserving()) { aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); } else { - int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt - (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); + long thresholdBytes = + context.getConnection().getQueryServices().getProps().getLong( + QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES); + boolean spoolingEnabled = + context.getConnection().getQueryServices().getProps().getBoolean( + QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED); List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size()); for (Expression keyExpression : keyExpressions) { keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true)); @@ -157,7 +163,10 @@ public class ClientAggregatePlan extends ClientProcessingPlan { // Pass in orderBy to apply any sort that has been optimized away aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions, orderBy); } else { - iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize()); + iterator = + new OrderedResultIterator(iterator, keyExpressionOrderBy, + spoolingEnabled, thresholdBytes, null, null, + projector.getEstimatedRowByteSize()); aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); } } @@ -181,9 +190,18 @@ public class ClientAggregatePlan extends ClientProcessingPlan { resultScanner = new LimitingResultIterator(resultScanner, limit); } } else { - int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( - QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); - resultScanner = new OrderedAggregatingResultIterator(aggResultIterator, orderBy.getOrderByExpressions(), thresholdBytes, limit, offset); + long thresholdBytes = + context.getConnection().getQueryServices().getProps().getLong( + QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES); + boolean spoolingEnabled = + context.getConnection().getQueryServices().getProps().getBoolean( + QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED); + resultScanner = + new OrderedAggregatingResultIterator(aggResultIterator, + orderBy.getOrderByExpressions(), spoolingEnabled, thresholdBytes, limit, + offset); } if (context.getSequenceManager().getSequenceCount() > 0) { resultScanner = new SequenceResultIterator(resultScanner, context.getSequenceManager()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java index 3427f5f..4a54a41 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java @@ -86,10 +86,18 @@ public class ClientScanPlan extends ClientProcessingPlan { } if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN - int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( - QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); - iterator = new OrderedResultIterator(iterator, orderBy.getOrderByExpressions(), thresholdBytes, limit, - offset, projector.getEstimatedRowByteSize()); + long thresholdBytes = + context.getConnection().getQueryServices().getProps().getLong( + QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES); + boolean spoolingEnabled = + context.getConnection().getQueryServices().getProps().getBoolean( + QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED); + iterator = + new OrderedResultIterator(iterator, orderBy.getOrderByExpressions(), + spoolingEnabled, thresholdBytes, limit, offset, + projector.getEstimatedRowByteSize()); } else { if (offset != null) { iterator = new OffsetResultIterator(iterator, offset); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index cdb2da5..0ad0d1b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -36,6 +36,7 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.ScanRegionObserver; import org.apache.phoenix.execute.visitor.ByteCountVisitor; import org.apache.phoenix.execute.visitor.QueryPlanVisitor; @@ -110,11 +111,10 @@ public class ScanPlan extends BaseQueryPlan { this.allowPageFilter = allowPageFilter; boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); if (isOrdered) { // TopN - int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( - QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); - ScanRegionObserver.serializeIntoScan(context.getScan(), thresholdBytes, - limit == null ? -1 : QueryUtil.getOffsetLimit(limit, offset), orderBy.getOrderByExpressions(), - projector.getEstimatedRowByteSize()); + ScanRegionObserver.serializeIntoScan(context.getScan(), + limit == null ? -1 : QueryUtil.getOffsetLimit(limit, offset), + orderBy.getOrderByExpressions(), projector.getEstimatedRowByteSize()); + ScanUtil.setClientVersion(context.getScan(), MetaDataProtocol.PHOENIX_VERSION); } Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit; perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index 0e7807e..e995827 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -45,6 +45,7 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; @@ -54,7 +55,9 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.iterate.DefaultParallelScanGrouper; import org.apache.phoenix.iterate.BufferedQueue; import org.apache.phoenix.iterate.ParallelScanGrouper; +import org.apache.phoenix.iterate.PhoenixQueues; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.SizeAwareQueue; import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.optimize.Cost; @@ -94,7 +97,8 @@ public class SortMergeJoinPlan implements QueryPlan { private final int rhsFieldPosition; private final boolean isSingleValueOnly; private final Set<TableRef> tableRefs; - private final int thresholdBytes; + private final long thresholdBytes; + private final boolean spoolingEnabled; private Long estimatedBytes; private Long estimatedRows; private Long estimateInfoTs; @@ -120,8 +124,14 @@ public class SortMergeJoinPlan implements QueryPlan { this.tableRefs = Sets.newHashSetWithExpectedSize(lhsPlan.getSourceRefs().size() + rhsPlan.getSourceRefs().size()); this.tableRefs.addAll(lhsPlan.getSourceRefs()); this.tableRefs.addAll(rhsPlan.getSourceRefs()); - this.thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( - QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); + this.thresholdBytes = + context.getConnection().getQueryServices().getProps().getLong( + QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES); + this.spoolingEnabled = + context.getConnection().getQueryServices().getProps().getBoolean( + QueryServices.CLIENT_JOIN_SPOOLING_ENABLED_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED); } @Override @@ -294,7 +304,7 @@ public class SortMergeJoinPlan implements QueryPlan { private ValueBitSet lhsBitSet; private ValueBitSet rhsBitSet; private byte[] emptyProjectedValue; - private BufferedTupleQueue queue; + private SizeAwareQueue<Tuple> queue; private Iterator<Tuple> queueIterator; public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) { @@ -316,14 +326,23 @@ public class SortMergeJoinPlan implements QueryPlan { int len = lhsBitSet.getEstimatedLength(); this.emptyProjectedValue = new byte[len]; lhsBitSet.toBytes(emptyProjectedValue, 0); - this.queue = new BufferedTupleQueue(thresholdBytes); + this.queue = PhoenixQueues.newTupleQueue(spoolingEnabled, thresholdBytes); this.queueIterator = null; } @Override public void close() throws SQLException { SQLException e = closeIterators(lhsIterator, rhsIterator); - queue.close(); + try { + queue.close(); + } catch (IOException t) { + if (e != null) { + e.setNextException( + new SQLException("Also encountered exception while closing queue", t)); + } else { + e = new SQLException("Error while closing queue",t); + } + } if (e != null) { throw e; } @@ -355,7 +374,11 @@ public class SortMergeJoinPlan implements QueryPlan { if (lhsKey.equals(rhsKey)) { next = join(lhsTuple, rhsTuple); if (nextLhsTuple != null && lhsKey.equals(nextLhsKey)) { - queue.offer(rhsTuple); + try { + queue.add(rhsTuple); + } catch (IllegalStateException e) { + throw new PhoenixIOException(e); + } if (nextRhsTuple == null || !rhsKey.equals(nextRhsKey)) { queueIterator = queue.iterator(); advance(true); @@ -610,108 +633,6 @@ public class SortMergeJoinPlan implements QueryPlan { } } - private static class BufferedTupleQueue extends BufferedQueue<Tuple> { - - public BufferedTupleQueue(int thresholdBytes) { - super(thresholdBytes); - } - - @Override - protected BufferedSegmentQueue<Tuple> createSegmentQueue( - int index, int thresholdBytes) { - return new BufferedTupleSegmentQueue(index, thresholdBytes, false); - } - - @Override - protected Comparator<BufferedSegmentQueue<Tuple>> getSegmentQueueComparator() { - return new Comparator<BufferedSegmentQueue<Tuple>>() { - @Override - public int compare(BufferedSegmentQueue<Tuple> q1, - BufferedSegmentQueue<Tuple> q2) { - return q1.index() - q2.index(); - } - }; - } - - @Override - public Iterator<Tuple> iterator() { - return new Iterator<Tuple>() { - private Iterator<BufferedSegmentQueue<Tuple>> queueIter; - private Iterator<Tuple> currentIter; - { - this.queueIter = getSegmentQueues().iterator(); - this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null; - } - - @Override - public boolean hasNext() { - return currentIter != null && currentIter.hasNext(); - } - - @Override - public Tuple next() { - if (!hasNext()) - return null; - - Tuple ret = currentIter.next(); - if (!currentIter.hasNext()) { - this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null; - } - - return ret; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - }; - } - - private static class BufferedTupleSegmentQueue extends BufferedSegmentQueue<Tuple> { - private LinkedList<Tuple> results; - - public BufferedTupleSegmentQueue(int index, - int thresholdBytes, boolean hasMaxQueueSize) { - super(index, thresholdBytes, hasMaxQueueSize); - this.results = Lists.newLinkedList(); - } - - @Override - protected Queue<Tuple> getInMemoryQueue() { - return results; - } - - @Override - protected int sizeOf(Tuple e) { - KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0)); - return Bytes.SIZEOF_INT * 2 + kv.getLength(); - } - - @Override - protected void writeToStream(DataOutputStream out, Tuple e) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0)); - out.writeInt(kv.getLength() + Bytes.SIZEOF_INT); - out.writeInt(kv.getLength()); - out.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); - } - - @Override - protected Tuple readFromStream(DataInputStream in) throws IOException { - int length = in.readInt(); - if (length < 0) - return null; - - byte[] b = new byte[length]; - in.readFully(b); - Result result = ResultUtil.toResult(new ImmutableBytesWritable(b)); - return new ResultTuple(result); - } - - } - } - @Override public boolean useRoundRobinIterator() { return false; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java index 42d07f5..fd02ab5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java @@ -73,4 +73,16 @@ public class VersionUtil { version |= (major << Byte.SIZE * 2); return version; } + + public static int decodeMajorVersion(int encodedVersion) { + return (encodedVersion >> Byte.SIZE * 2); + } + + public static int decodeMinorVersion(int encodedVersion) { + return (encodedVersion >> Byte.SIZE) & 0xFF; + } + + public static int decodePatchVersion(int encodedVersion) { + return encodedVersion & 0xFF; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java index 6f6c523..1a646e6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java @@ -36,14 +36,14 @@ import java.util.UUID; import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; -public abstract class BufferedQueue<T> extends AbstractQueue<T> { - private final int thresholdBytes; +public abstract class BufferedQueue<T> extends AbstractQueue<T> implements SizeAwareQueue<T> { + private final long thresholdBytes; private List<BufferedSegmentQueue<T>> queues; private int currentIndex; private BufferedSegmentQueue<T> currentQueue; private MinMaxPriorityQueue<BufferedSegmentQueue<T>> mergedQueue; - public BufferedQueue(int thresholdBytes) { + public BufferedQueue(long thresholdBytes) { this.thresholdBytes = thresholdBytes; this.queues = Lists.<BufferedSegmentQueue<T>> newArrayList(); this.currentIndex = -1; @@ -51,7 +51,7 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> { this.mergedQueue = null; } - abstract protected BufferedSegmentQueue<T> createSegmentQueue(int index, int thresholdBytes); + abstract protected BufferedSegmentQueue<T> createSegmentQueue(int index, long thresholdBytes); abstract protected Comparator<BufferedSegmentQueue<T>> getSegmentQueueComparator(); @@ -122,10 +122,12 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> { return size; } + @Override public long getByteSize() { return currentQueue == null ? 0 : currentQueue.getInMemByteSize(); } + @Override public void close() { for (BufferedSegmentQueue<T> queue : queues) { queue.close(); @@ -150,10 +152,10 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> { protected static final int EOF = -1; private final int index; - private final int thresholdBytes; + private final long thresholdBytes; private final boolean hasMaxQueueSize; private long totalResultSize = 0; - private int maxResultSize = 0; + private long maxResultSize = 0; private File file; private boolean isClosed = false; private boolean flushBuffer = false; @@ -163,7 +165,7 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> { // iterators to close on close() private List<SegmentQueueFileIterator> iterators; - public BufferedSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) { + public BufferedSegmentQueue(int index, long thresholdBytes, boolean hasMaxQueueSize) { this.index = index; this.thresholdBytes = thresholdBytes; this.hasMaxQueueSize = hasMaxQueueSize; @@ -171,7 +173,7 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> { } abstract protected Queue<T> getInMemoryQueue(); - abstract protected int sizeOf(T e); + abstract protected long sizeOf(T e); abstract protected void writeToStream(DataOutputStream out, T e) throws IOException; abstract protected T readFromStream(DataInputStream in) throws IOException; @@ -296,7 +298,7 @@ public abstract class BufferedQueue<T> extends AbstractQueue<T> { private void flush(T entry) throws IOException { Queue<T> inMemQueue = getInMemoryQueue(); - int resultSize = sizeOf(entry); + long resultSize = sizeOf(entry); maxResultSize = Math.max(maxResultSize, resultSize); totalResultSize = hasMaxQueueSize ? maxResultSize * inMemQueue.size() : (totalResultSize + resultSize); if (totalResultSize >= thresholdBytes) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java index c1ef667..f4bbc4e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java @@ -25,6 +25,7 @@ import java.util.Comparator; import java.util.List; import java.util.Queue; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -40,7 +41,7 @@ public class BufferedSortedQueue extends BufferedQueue<ResultEntry> { private final int limit; public BufferedSortedQueue(Comparator<ResultEntry> comparator, - Integer limit, int thresholdBytes) throws IOException { + Integer limit, long thresholdBytes) throws IOException { super(thresholdBytes); this.comparator = comparator; this.limit = limit == null ? -1 : limit; @@ -48,7 +49,7 @@ public class BufferedSortedQueue extends BufferedQueue<ResultEntry> { @Override protected BufferedSegmentQueue<ResultEntry> createSegmentQueue( - int index, int thresholdBytes) { + int index, long thresholdBytes) { return new BufferedResultEntryPriorityQueue(index, thresholdBytes, limit, comparator); } @@ -66,7 +67,7 @@ public class BufferedSortedQueue extends BufferedQueue<ResultEntry> { private MinMaxPriorityQueue<ResultEntry> results = null; public BufferedResultEntryPriorityQueue(int index, - int thresholdBytes, int limit, Comparator<ResultEntry> comparator) { + long thresholdBytes, int limit, Comparator<ResultEntry> comparator) { super(index, thresholdBytes, limit >= 0); this.results = limit < 0 ? MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create() @@ -79,8 +80,8 @@ public class BufferedSortedQueue extends BufferedQueue<ResultEntry> { } @Override - protected int sizeOf(ResultEntry e) { - return sizeof(e.sortKeys) + sizeof(toKeyValues(e)); + protected long sizeOf(ResultEntry e) { + return ResultEntry.sizeOf(e); } @SuppressWarnings("deprecation") @@ -141,33 +142,10 @@ public class BufferedSortedQueue extends BufferedQueue<ResultEntry> { int size = result.size(); List<KeyValue> kvs = new ArrayList<KeyValue>(size); for (int i = 0; i < size; i++) { - kvs.add(org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(result.getValue(i))); + kvs.add(KeyValueUtil.ensureKeyValue(result.getValue(i))); } return kvs; } - private int sizeof(List<KeyValue> kvs) { - int size = Bytes.SIZEOF_INT; // totalLen - - for (KeyValue kv : kvs) { - size += kv.getLength(); - size += Bytes.SIZEOF_INT; // kv.getLength - } - - return size; - } - - private int sizeof(ImmutableBytesWritable[] sortKeys) { - int size = Bytes.SIZEOF_INT; - if (sortKeys != null) { - for (ImmutableBytesWritable sortKey : sortKeys) { - if (sortKey != null) { - size += sortKey.getLength(); - } - size += Bytes.SIZEOF_INT; - } - } - return size; - } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedTupleQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedTupleQueue.java new file mode 100644 index 0000000..8af3607 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedTupleQueue.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.schema.tuple.ResultTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ResultUtil; + +import com.google.common.collect.Lists; + +public class BufferedTupleQueue extends BufferedQueue<Tuple> { + + public BufferedTupleQueue(long thresholdBytes) { + super(thresholdBytes); + } + + @Override + protected BufferedSegmentQueue<Tuple> createSegmentQueue(int index, long thresholdBytes) { + return new BufferedTupleSegmentQueue(index, thresholdBytes, false); + } + + @Override + protected Comparator<BufferedSegmentQueue<Tuple>> getSegmentQueueComparator() { + return new Comparator<BufferedSegmentQueue<Tuple>>() { + @Override + public int compare(BufferedSegmentQueue<Tuple> q1, BufferedSegmentQueue<Tuple> q2) { + return q1.index() - q2.index(); + } + }; + } + + @Override + public Iterator<Tuple> iterator() { + return new Iterator<Tuple>() { + private Iterator<BufferedSegmentQueue<Tuple>> queueIter; + private Iterator<Tuple> currentIter; + { + this.queueIter = getSegmentQueues().iterator(); + this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null; + } + + @Override + public boolean hasNext() { + return currentIter != null && currentIter.hasNext(); + } + + @Override + public Tuple next() { + if (!hasNext()) return null; + + Tuple ret = currentIter.next(); + if (!currentIter.hasNext()) { + this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null; + } + + return ret; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + + private static class BufferedTupleSegmentQueue extends BufferedSegmentQueue<Tuple> { + private LinkedList<Tuple> results; + + public BufferedTupleSegmentQueue(int index, long thresholdBytes, boolean hasMaxQueueSize) { + super(index, thresholdBytes, hasMaxQueueSize); + this.results = Lists.newLinkedList(); + } + + @Override + protected Queue<Tuple> getInMemoryQueue() { + return results; + } + + @Override + protected long sizeOf(Tuple e) { + KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0)); + return Bytes.SIZEOF_INT * 2 + kv.getLength(); + } + + @Override + protected void writeToStream(DataOutputStream out, Tuple e) throws IOException { + KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0)); + out.writeInt(kv.getLength() + Bytes.SIZEOF_INT); + out.writeInt(kv.getLength()); + out.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); + } + + @Override + protected Tuple readFromStream(DataInputStream in) throws IOException { + int length = in.readInt(); + if (length < 0) return null; + + byte[] b = new byte[length]; + in.readFully(b); + Result result = ResultUtil.toResult(new ImmutableBytesWritable(b)); + return new ResultTuple(result); + } + + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java index 1504a7c..26114dd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java @@ -42,6 +42,7 @@ import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.coprocessor.BaseRegionScanner; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.HashJoinRegionScanner; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -50,11 +51,14 @@ import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.function.ArrayIndexFunction; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.ValueBitSet; @@ -68,6 +72,7 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -161,7 +166,15 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme), scanOffset), scan.getAttribute(QueryConstants.LAST_SCAN) != null); } - final OrderedResultIterator iterator = deserializeFromScan(scan, innerScanner); + boolean spoolingEnabled = + env.getConfiguration().getBoolean( + QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB, + QueryServicesOptions.DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED); + long thresholdBytes = + env.getConfiguration().getLong(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES); + final OrderedResultIterator iterator = + deserializeFromScan(scan, innerScanner, spoolingEnabled, thresholdBytes); if (iterator == null) { return innerScanner; } @@ -169,15 +182,31 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { return getTopNScanner(env, innerScanner, iterator, tenantId); } - private static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) { + @VisibleForTesting + static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s, + boolean spoolingEnabled, long thresholdBytes) { byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN); if (topN == null) { return null; - } - ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size? - try { + } + int clientVersion = ScanUtil.getClientVersion(scan); + // Client including and after 4.15 and 5.1 are not going to serialize thresholdBytes + // so we need to decode this only for older clients to not break wire compat + boolean shouldDecodeSpoolThreshold = + (scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION) == null) + || (VersionUtil.decodeMajorVersion(clientVersion) > 5) + || (VersionUtil.decodeMajorVersion(clientVersion) == 5 + && clientVersion < MetaDataProtocol.MIN_5_x_DISABLE_SERVER_SPOOL_THRESHOLD) + || (VersionUtil.decodeMajorVersion(clientVersion) == 4 + && clientVersion < MetaDataProtocol.MIN_4_x_DISABLE_SERVER_SPOOL_THRESHOLD); + ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size? + try { DataInputStream input = new DataInputStream(stream); - int thresholdBytes = WritableUtils.readVInt(input); + if (shouldDecodeSpoolThreshold) { + // Read off the scan but ignore, we won't honor client sent thresholdbytes, but the + // one set on server + WritableUtils.readVInt(input); + } int limit = WritableUtils.readVInt(input); int estimatedRowSize = WritableUtils.readVInt(input); int size = WritableUtils.readVInt(input); @@ -189,8 +218,8 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { } PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme); - return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null, - estimatedRowSize); + return new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled, + thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize); } catch (IOException e) { throw new RuntimeException(e); } finally { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java index 51a7dd8..ef4b607 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java @@ -36,9 +36,10 @@ import org.apache.phoenix.schema.tuple.Tuple; public class OrderedAggregatingResultIterator extends OrderedResultIterator implements AggregatingResultIterator { public OrderedAggregatingResultIterator(AggregatingResultIterator delegate, - List<OrderByExpression> orderByExpressions, int thresholdBytes, Integer limit, Integer offset) + List<OrderByExpression> orderByExpressions, boolean spoolingEnabled, long thresholdBytes, + Integer limit, Integer offset) throws SQLException { - super(delegate, orderByExpressions, thresholdBytes, limit, offset); + super(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index 22712ff..212410c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -22,14 +22,19 @@ import static com.google.common.base.Preconditions.checkPositionIndex; import java.io.IOException; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Queue; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; +import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ServerUtil; @@ -68,6 +73,44 @@ public class OrderedResultIterator implements PeekingResultIterator { Tuple getResult() { return result; } + + static long sizeOf(ResultEntry e) { + return sizeof(e.sortKeys) + sizeof(toKeyValues(e)); + } + + private static long sizeof(List<KeyValue> kvs) { + long size = Bytes.SIZEOF_INT; // totalLen + + for (KeyValue kv : kvs) { + size += kv.getLength(); + size += Bytes.SIZEOF_INT; // kv.getLength + } + + return size; + } + + private static long sizeof(ImmutableBytesWritable[] sortKeys) { + long size = Bytes.SIZEOF_INT; + if (sortKeys != null) { + for (ImmutableBytesWritable sortKey : sortKeys) { + if (sortKey != null) { + size += sortKey.getLength(); + } + size += Bytes.SIZEOF_INT; + } + } + return size; + } + + private static List<KeyValue> toKeyValues(ResultEntry entry) { + Tuple result = entry.getResult(); + int size = result.size(); + List<KeyValue> kvs = new ArrayList<KeyValue>(size); + for (int i = 0; i < size; i++) { + kvs.add(org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(result.getValue(i))); + } + return kvs; + } } /** A function that returns Nth key for a given {@link ResultEntry}. */ @@ -91,7 +134,8 @@ public class OrderedResultIterator implements PeekingResultIterator { } }; - private final int thresholdBytes; + private final boolean spoolingEnabled; + private final long thresholdBytes; private final Integer limit; private final Integer offset; private final ResultIterator delegate; @@ -106,20 +150,22 @@ public class OrderedResultIterator implements PeekingResultIterator { } public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, - int thresholdBytes, Integer limit, Integer offset) { - this(delegate, orderByExpressions, thresholdBytes, limit, offset, 0); + boolean spoolingEnabled, long thresholdBytes, Integer limit, Integer offset) { + this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, 0); } public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, - int thresholdBytes) throws SQLException { - this(delegate, orderByExpressions, thresholdBytes, null, null); + boolean spoolingEnabled, long thresholdBytes) throws SQLException { + this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, null, null); } - public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, - int thresholdBytes, Integer limit, Integer offset,int estimatedRowSize) { + public OrderedResultIterator(ResultIterator delegate, + List<OrderByExpression> orderByExpressions, boolean spoolingEnabled, + long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize) { checkArgument(!orderByExpressions.isEmpty()); this.delegate = delegate; this.orderByExpressions = orderByExpressions; + this.spoolingEnabled = spoolingEnabled; this.thresholdBytes = thresholdBytes; this.offset = offset == null ? 0 : offset; if (limit != null) { @@ -208,8 +254,9 @@ public class OrderedResultIterator implements PeekingResultIterator { List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION)); final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions); try{ - final BufferedSortedQueue queueEntries = new BufferedSortedQueue(comparator, limit, - thresholdBytes); + final SizeAwareQueue<ResultEntry> queueEntries = + PhoenixQueues.newResultEntrySortedQueue(comparator, limit, spoolingEnabled, + thresholdBytes); resultIterator = new PeekingResultIterator() { int count = 0; @@ -249,7 +296,11 @@ public class OrderedResultIterator implements PeekingResultIterator { @Override public void close() throws SQLException { - queueEntries.close(); + try { + queueEntries.close(); + } catch (IOException e) { + throw new SQLException(e); + } } }; for (Tuple result = delegate.next(); result != null; result = delegate.next()) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/PhoenixQueues.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/PhoenixQueues.java new file mode 100644 index 0000000..2101904 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/PhoenixQueues.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.io.IOException; +import java.util.Comparator; +import java.util.LinkedList; + +import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry; +import org.apache.phoenix.schema.tuple.Tuple; + +import com.google.common.collect.MinMaxPriorityQueue; + +public class PhoenixQueues { + + private PhoenixQueues() { + } + + public static SizeAwareQueue<ResultEntry> newBufferedResultEntrySortedQueue( + Comparator<ResultEntry> comparator, Integer limit, long thresholdBytes) + throws IOException { + return new BufferedSortedQueue(comparator, limit, thresholdBytes); + } + + public static SizeAwareQueue<Tuple> newBufferedTupleQueue(long thresholdBytes) { + return new BufferedTupleQueue(thresholdBytes); + } + + public static SizeAwareQueue<ResultEntry> newSizeBoundResultEntrySortedQueue( + Comparator<ResultEntry> comparator, Integer limit, long maxSizeBytes) { + limit = limit == null ? -1 : limit; + MinMaxPriorityQueue<ResultEntry> queue = + limit < 0 ? MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create() + : MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).maximumSize(limit) + .create(); + return new SizeBoundQueue<ResultEntry>(maxSizeBytes, queue) { + @Override + public long sizeOf(org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry e) { + return ResultEntry.sizeOf(e); + } + + }; + } + + public static SizeAwareQueue<Tuple> newSizeBoundTupleQueue(long maxSizeBytes) { + LinkedList<Tuple> results = Lists.newLinkedList(); + return new SizeBoundQueue<Tuple>(maxSizeBytes, results) { + + @Override + public long sizeOf(Tuple e) { + KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0)); + return Bytes.SIZEOF_INT * 2 + kv.getLength(); + } + + }; + } + + public static SizeAwareQueue<ResultEntry> newResultEntrySortedQueue( + Comparator<ResultEntry> comparator, Integer limit, boolean spoolingEnabled, + long thresholdBytes) throws IOException { + if (spoolingEnabled) { + return newBufferedResultEntrySortedQueue(comparator, limit, thresholdBytes); + } else { + return newSizeBoundResultEntrySortedQueue(comparator, limit, thresholdBytes); + } + } + + public static SizeAwareQueue<Tuple> newTupleQueue(boolean spoolingEnabled, + long thresholdBytes) { + if (spoolingEnabled) { + return newBufferedTupleQueue(thresholdBytes); + } else { + return newSizeBoundTupleQueue(thresholdBytes); + } + } + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeAwareQueue.java similarity index 56% copy from phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java copy to phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeAwareQueue.java index c5eeaff..73b3554 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeAwareQueue.java @@ -15,22 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.phoenix.end2end; +package org.apache.phoenix.iterate; -import java.util.Map; +import java.io.Closeable; +import java.util.Queue; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.BeforeClass; +public interface SizeAwareQueue<T> extends Queue<T>, Closeable { -import com.google.common.collect.Maps; + public long getByteSize(); -public class OrderByWithSpillingIT extends OrderByIT { - @BeforeClass - public static void doSetup() throws Exception { - Map<String, String> props = Maps.newHashMapWithExpectedSize(1); - // do lot's of spooling! - props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeBoundQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeBoundQueue.java new file mode 100644 index 0000000..eb1e6be --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeBoundQueue.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.io.IOException; +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.Queue; + +public abstract class SizeBoundQueue<T> extends AbstractQueue<T> implements SizeAwareQueue<T> { + + private long maxSizeBytes; + private Queue<T> delegate; + private long currentSize; + + public SizeBoundQueue(long maxSizeBytes, Queue<T> delegate) { + assert maxSizeBytes > 0; + this.maxSizeBytes = maxSizeBytes; + this.delegate = delegate; + } + + abstract public long sizeOf(T e); + + @Override + public boolean offer(T e) { + boolean success = false; + long elementSize = sizeOf(e); + if ((currentSize + elementSize) < maxSizeBytes) { + success = delegate.offer(e); + if (success) { + currentSize += elementSize; + } + } + return success; + } + + @Override + public boolean add(T e) { + try { + return super.add(e); + } catch (IllegalStateException ex) { + throw new IllegalStateException( + "Queue full. Consider increasing memory threshold or spooling to disk", ex); + } + } + + @Override + public T poll() { + T e = delegate.poll(); + if (e != null) { + currentSize -= sizeOf(e); + } + return e; + } + + @Override + public T peek() { + return delegate.peek(); + } + + @Override + public void close() throws IOException { + delegate.clear(); + } + + @Override + public long getByteSize() { + return currentSize; + } + + @Override + public Iterator<T> iterator() { + return delegate.iterator(); + } + + @Override + public int size() { + return delegate.size(); + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java index 3136ca8..fa90b1a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java @@ -95,7 +95,8 @@ public class SpoolingResultIterator implements PeekingResultIterator { private SpoolingResultIterator(SpoolingMetricsHolder spoolMetrics, MemoryMetricsHolder memoryMetrics, ResultIterator scanner, QueryServices services) throws SQLException { this (spoolMetrics, memoryMetrics, scanner, services.getMemoryManager(), - services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES), + services.getProps().getLong(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES), services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES), services.getProps().get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY)); } @@ -109,7 +110,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { * the memory manager) is exceeded. * @throws SQLException */ - SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder mMetrics, ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException { + SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder mMetrics, ResultIterator scanner, MemoryManager mm, final long thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException { this.spoolMetrics = sMetrics; this.memoryMetrics = mMetrics; boolean success = false; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 1c16ccc..70b7603 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -44,7 +44,16 @@ public interface QueryServices extends SQLCloseable { public static final String THREAD_POOL_SIZE_ATTRIB = "phoenix.query.threadPoolSize"; public static final String QUEUE_SIZE_ATTRIB = "phoenix.query.queueSize"; public static final String THREAD_TIMEOUT_MS_ATTRIB = "phoenix.query.timeoutMs"; - public static final String SPOOL_THRESHOLD_BYTES_ATTRIB = "phoenix.query.spoolThresholdBytes"; + public static final String SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB = + "phoenix.query.server.spoolThresholdBytes"; + public static final String CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB = + "phoenix.query.client.spoolThresholdBytes"; + public static final String CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB = + "phoenix.query.client.orderBy.spooling.enabled"; + public static final String CLIENT_JOIN_SPOOLING_ENABLED_ATTRIB = + "phoenix.query.client.join.spooling.enabled"; + public static final String SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB = + "phoenix.query.server.orderBy.spooling.enabled"; public static final String HBASE_CLIENT_KEYTAB = "hbase.myclient.keytab"; public static final String HBASE_CLIENT_PRINCIPAL = "hbase.myclient.principal"; public static final String SPOOL_DIRECTORY = "phoenix.spool.directory"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index cc8bed3..fc99d5b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -23,6 +23,7 @@ import static org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; +import static org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED; @@ -78,10 +79,10 @@ import static org.apache.phoenix.query.QueryServices.RUN_RENEW_LEASE_FREQUENCY_I import static org.apache.phoenix.query.QueryServices.RUN_UPDATE_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE; +import static org.apache.phoenix.query.QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB; import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY; -import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.STATS_COLLECTION_ENABLED; import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB; @@ -134,6 +135,11 @@ public class QueryServicesOptions { public static final int DEFAULT_QUEUE_SIZE = 5000; public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m + public static final int DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m + public static final int DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m + public static final boolean DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED = true; + public static final boolean DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED = true; + public static final boolean DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED = true; public static final String DEFAULT_SPOOL_DIRECTORY = System.getProperty("java.io.tmpdir"); public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100; @@ -406,7 +412,8 @@ public class QueryServicesOptions { .setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE) .setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE) .setIfUnset(THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS) - .setIfUnset(SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_SPOOL_THRESHOLD_BYTES) + .setIfUnset(CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES) + .setIfUnset(SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES) .setIfUnset(SPOOL_DIRECTORY, DEFAULT_SPOOL_DIRECTORY) .setIfUnset(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC) .setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC) @@ -518,8 +525,12 @@ public class QueryServicesOptions { return set(THREAD_TIMEOUT_MS_ATTRIB, threadTimeoutMs); } - public QueryServicesOptions setSpoolThresholdBytes(int spoolThresholdBytes) { - return set(SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes); + public QueryServicesOptions setClientSpoolThresholdBytes(long spoolThresholdBytes) { + return set(CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes); + } + + public QueryServicesOptions setServerSpoolThresholdBytes(long spoolThresholdBytes) { + return set(SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes); } public QueryServicesOptions setSpoolDirectory(String spoolDirectory) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java index 50ed8e9..88bef47 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java @@ -16,12 +16,23 @@ */ package org.apache.phoenix.iterate; +import static org.junit.Assert.fail; + import java.sql.SQLException; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.phoenix.coprocessor.ScanRegionObserver; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.OrderByExpression; +import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.util.ScanUtil; import org.junit.Test; +import org.mockito.Mockito; /** * Test class for {@link OrderedResultIterator}. @@ -33,9 +44,47 @@ public class OrderedResultIteratorTest { ResultIterator delegate = ResultIterator.EMPTY_ITERATOR; List<OrderByExpression> orderByExpressions = Collections.singletonList(null); int thresholdBytes = Integer.MAX_VALUE; - OrderedResultIterator iterator = new OrderedResultIterator(delegate, orderByExpressions, thresholdBytes); - // Should not throw an exception + boolean spoolingEnabled = true; + OrderedResultIterator iterator = + new OrderedResultIterator(delegate, orderByExpressions, spoolingEnabled, + thresholdBytes); + // Should not throw an exception iterator.close(); - } + } + + @Test + public void testSpoolingBackwardCompatibility() { + RegionScanner s = Mockito.mock(RegionScanner.class); + Scan scan = new Scan(); + Expression exp = LiteralExpression.newConstant(Boolean.TRUE); + OrderByExpression ex = new OrderByExpression(exp, false, false); + ScanRegionObserver.serializeIntoScan(scan, 0, Arrays.asList(ex), 100); + // Check 5.1.0 & Check > 5.1.0 + ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("5.1.0")); + NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100); + + ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("5.2.0")); + NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100); + // Check 4.15.0 Check > 4.15.0 + ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("4.15.0")); + NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100); + ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("4.15.1")); + NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100); + + // Check < 5.1 + ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("5.0.0")); + try { + NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100); + fail("Deserialize should fail for 5.0.0 since we didn't serialize thresholdBytes"); + } catch (IllegalArgumentException e) { + } + // Check < 4.15 + ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("4.14.0")); + try { + NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 100); + fail("Deserialize should fail for 4.14.0 since we didn't serialize thresholdBytes"); + } catch (IllegalArgumentException e) { + } + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index 59e7fd3..df9d843 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -99,7 +99,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE) .setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC) .setThreadTimeoutMs(DEFAULT_THREAD_TIMEOUT_MS) - .setSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES) + .setClientSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES) + .setServerSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES) .setSpoolDirectory(DEFAULT_SPOOL_DIRECTORY) .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC) .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE) diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java index d934a04..59c7739 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java @@ -59,7 +59,15 @@ public class MetaDataUtilTest { assertTrue(VersionUtil.encodeVersion("0.94.1-mapR")>VersionUtil.encodeVersion("0.94")); assertTrue(VersionUtil.encodeVersion("1", "1", "3")>VersionUtil.encodeVersion("1", "1", "1")); } - + + @Test + public void testDecode() { + int encodedVersion = VersionUtil.encodeVersion("4.15.5"); + assertEquals(VersionUtil.decodeMajorVersion(encodedVersion), 4); + assertEquals(VersionUtil.decodeMinorVersion(encodedVersion), 15); + assertEquals(VersionUtil.decodePatchVersion(encodedVersion), 5); + } + @Test public void testCompatibility() { assertTrue(MetaDataUtil.areClientAndServerCompatible(VersionUtil.encodeVersion(1,2,1), 1, 2));