This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 610a663 HIVE-23198: Add matching logic between CacheTags and proactive eviction requests (Adam Szita, reviewed by Peter Vary) 610a663 is described below commit 610a663e1cf36114339fb36388fcbefbda2a3b97 Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Thu Jul 23 15:00:58 2020 +0200 HIVE-23198: Add matching logic between CacheTags and proactive eviction requests (Adam Szita, reviewed by Peter Vary) --- .../hive/llap/cache/TestCacheContentsTracker.java | 7 +- .../hive/llap/cache/TestProactiveEviction.java | 96 ++++++++++++++++++++++ .../apache/hadoop/hive/llap/ProactiveEviction.java | 52 +++++++++++- .../org/apache/hadoop/hive/common/io/CacheTag.java | 14 ++-- 4 files changed, 157 insertions(+), 12 deletions(-) diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java index 175cbac..15d3f8f 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.cache; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import org.apache.hadoop.hive.common.io.CacheTag; @@ -124,7 +125,7 @@ public class TestCacheContentsTracker { @Test public void testEncodingDecoding() throws Exception { - Map<String, String> partDescs = new HashMap<>(); + LinkedHashMap<String, String> partDescs = new LinkedHashMap<>(); partDescs.put("pytha=goras", "a2+b2=c2"); CacheTag tag = CacheTag.build("math.rules", partDescs); CacheTag.SinglePartitionCacheTag stag = ((CacheTag.SinglePartitionCacheTag)tag); @@ -166,9 +167,9 @@ public class TestCacheContentsTracker { return llapCacheableBufferMock; } - private static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) { + public static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) { if (partitions != null && partitions.length > 0) { - Map<String, String> partDescs = new HashMap<>(); + LinkedHashMap<String, String> partDescs = new LinkedHashMap<>(); for (String partition : partitions) { String[] partDesc = partition.split("="); partDescs.put(partDesc[0], partDesc[1]); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java new file mode 100644 index 0000000..689e27f --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.cache; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hive.common.io.CacheTag; +import org.apache.hadoop.hive.llap.ProactiveEviction; +import org.apache.hadoop.hive.llap.ProactiveEviction.Request; +import org.apache.hadoop.hive.llap.ProactiveEviction.Request.Builder; + +import org.junit.Test; + +import static org.apache.hadoop.hive.llap.cache.TestCacheContentsTracker.cacheTagBuilder; +import static org.junit.Assert.assertEquals; + +/** + * Test cases for proactive LLAP cache eviction. + */ +public class TestProactiveEviction { + + private static final CacheTag[] TEST_TAGS = new CacheTag[] { + cacheTagBuilder("fx.rates", "from=USD", "to=HUF"), + cacheTagBuilder("fx.rates", "from=USD", "to=EUR"), + cacheTagBuilder("fx.rates", "from=USD", "to=EUR"), + cacheTagBuilder("fx.rates", "from=USD", "to=EUR"), + cacheTagBuilder("fx.rates", "from=EUR", "to=HUF"), + cacheTagBuilder("fx.futures", "ccy=EUR"), + cacheTagBuilder("fx.futures", "ccy=JPY"), + cacheTagBuilder("fx.futures", "ccy=JPY"), + cacheTagBuilder("fx.futures", "ccy=USD"), + cacheTagBuilder("fx.centralbanks"), + cacheTagBuilder("fx.centralbanks"), + cacheTagBuilder("fx.centralbanks"), + cacheTagBuilder("equity.prices", "ex=NYSE"), + cacheTagBuilder("equity.prices", "ex=NYSE"), + cacheTagBuilder("equity.prices", "ex=NASDAQ"), + cacheTagBuilder("fixedincome.bonds"), + cacheTagBuilder("fixedincome.bonds"), + cacheTagBuilder("fixedincome.yieldcurves") + }; + + @Test + public void testCachetagAndRequestMatching() throws Exception { + assertMatchOnTags(Builder.create().addDb("fx"), "111111111111000000"); + assertMatchOnTags(Builder.create().addTable("fx", "futures"), "000001111000000000"); + assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "futures", buildParts("ccy", "JPY")), + "000000110000000000"); + assertMatchOnTags(Builder.create().addPartitionOfATable("equity", "prices", buildParts("ex", "NYSE")) + .addPartitionOfATable("equity", "prices", buildParts("ex", "NYSE")),"000000000000110000"); + assertMatchOnTags(Builder.create().addTable("fx", "rates").addTable("fx", "futures"), + "111111111000000000"); + assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "rates", buildParts("from", "PLN")), + "000000000000000000"); + assertMatchOnTags(Builder.create().addTable("fixedincome", "bonds"), "000000000000000110"); + assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "rates", buildParts("from", "EUR", "to", "HUF")), + "000010000000000000"); + } + + private static LinkedHashMap buildParts(String... vals) { + LinkedHashMap<String, String> ret = new LinkedHashMap<>(); + for (int i = 0; i < vals.length; i+=2) { + ret.put(vals[i], vals[i+1]); + } + return ret; + } + + private static void assertMatchOnTags(Builder requestBuilder, String expected) { + assert expected.length() == TEST_TAGS.length; + // Marshal + unmarshal + Request request = Builder.create().fromProtoRequest(requestBuilder.build().toProtoRequests().get(0)).build(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < TEST_TAGS.length; ++i) { + sb.append(request.isTagMatch(TEST_TAGS[i]) ? '1' : '0'); + } + assertEquals(expected, sb.toString()); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java b/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java index ba6d33e..522d6da 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; @@ -229,12 +230,59 @@ public final class ProactiveEviction { } /** - * Match a CacheTag to this eviction request. + * Match a CacheTag to this eviction request. Must only be used on LLAP side only, where the received request may + * only contain one information for one DB. + * * @param cacheTag * @return true if cacheTag matches and the related buffer is eligible for proactive eviction, false otherwise. */ public boolean isTagMatch(CacheTag cacheTag) { - // TODO: HIVE-23198 + String db = getSingleDbName(); + if (db == null) { + // Number of DBs in the request was not exactly 1. + throw new UnsupportedOperationException("Predicate only implemented for 1 DB case."); + } + TableName tagTableName = TableName.fromString(cacheTag.getTableName(), null, null); + + // Check against DB. + if (!db.equals(tagTableName.getDb())) { + return false; + } + + Map<String, Set<LinkedHashMap<String, String>>> tables = entities.get(db); + + // If true, must be a drop DB event and this cacheTag matches. + if (tables.isEmpty()) { + return true; + } + + Map<String, String> tagPartDescMap = null; + if (cacheTag instanceof CacheTag.PartitionCacheTag) { + tagPartDescMap = ((CacheTag.PartitionCacheTag) cacheTag).getPartitionDescMap(); + } + + // Check against table name. + for (String tableAndDbName : tables.keySet()) { + if (tableAndDbName.equals(tagTableName.getNotEmptyDbTable())) { + + Set<LinkedHashMap<String, String>> partDescs = tables.get(tableAndDbName); + + // If true, must be a drop table event, and this cacheTag matches. + if (partDescs == null) { + return true; + } + + // Check against partition keys and values and alas for drop partition event. + if (!(cacheTag instanceof CacheTag.PartitionCacheTag)) { + throw new IllegalArgumentException("CacheTag has no partition information, while trying" + + " to evict due to (and based on) a drop partition DDL statement.."); + } + + if (partDescs.contains(tagPartDescMap)) { + return true; + } + } + } return false; } diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java index ba6e534..51fbd64 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hive.common.io; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; @@ -80,7 +80,7 @@ public abstract class CacheTag implements Comparable<CacheTag> { return new TableCacheTag(tableName); } - public static final CacheTag build(String tableName, Map<String, String> partDescMap) { + public static final CacheTag build(String tableName, LinkedHashMap<String, String> partDescMap) { if (StringUtils.isEmpty(tableName) || partDescMap == null || partDescMap.isEmpty()) { throw new IllegalArgumentException(); } @@ -179,7 +179,7 @@ public abstract class CacheTag implements Comparable<CacheTag> { * Returns a map of partition keys and values built from the information of this CacheTag. * @return the map */ - public abstract Map<String, String> getPartitionDescMap(); + public abstract LinkedHashMap<String, String> getPartitionDescMap(); } @@ -204,8 +204,8 @@ public abstract class CacheTag implements Comparable<CacheTag> { } @Override - public Map<String, String> getPartitionDescMap() { - Map<String, String> result = new HashMap<>(); + public LinkedHashMap<String, String> getPartitionDescMap() { + LinkedHashMap<String, String> result = new LinkedHashMap<>(); String[] partition = CacheTag.decodePartDesc(partitionDesc); result.put(partition[0], partition[1]); return result; @@ -305,8 +305,8 @@ public abstract class CacheTag implements Comparable<CacheTag> { } @Override - public Map<String, String> getPartitionDescMap() { - Map<String, String> result = new HashMap<>(); + public LinkedHashMap<String, String> getPartitionDescMap() { + LinkedHashMap<String, String> result = new LinkedHashMap<>(); for (String partDesc : partitionDesc) { String[] partition = CacheTag.decodePartDesc(partDesc); result.put(partition[0], partition[1]);