[ https://issues.apache.org/jira/browse/PHOENIX-2900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15388805#comment-15388805 ]
chenglei edited comment on PHOENIX-2900 at 7/22/16 5:51 AM: ------------------------------------------------------------ [~jamestaylor], [~samarthjain] , thank you for review, this issue can only reproduce in a distribute cluster,because the GlobalCache is JVM scope,I will change my test case.How can run in a distribute cluster,extends from BaseOwnClusterHBaseManagedTimeIT? was (Author: comnetwork): [~jamestaylor], [~samarthjain] , thank you for review, this issue can only reproduce in a distribute cluster,because the ServerCache is JVM scope,I will change my test case.How can run in a distribute cluster,extends from BaseOwnClusterHBaseManagedTimeIT? > Unable to find hash cache once a salted table 's first region has split > ----------------------------------------------------------------------- > > Key: PHOENIX-2900 > URL: https://issues.apache.org/jira/browse/PHOENIX-2900 > Project: Phoenix > Issue Type: Bug > Affects Versions: 4.7.0 > Reporter: chenglei > Assignee: chenglei > Priority: Critical > Fix For: 4.8.0 > > Attachments: PHOENIX-2900_v1.patch > > > When I join a salted table (which has been split after creation) with another > table in my business system ,I meet following error,even though I clear the > salted table 's TableRegionCache: > {code:borderStyle=solid} > org.apache.phoenix.exception.PhoenixIOException: > org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for > joinId: %�����2. The cache might have expired and have been removed. > at > org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98) > at > org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218) > at > org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201) > at > org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203) > at > org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517) > at > org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592) > at > org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556) > at > org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198) > at > org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178) > at > org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925) > at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031) > at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108) > at > org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116) > at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96) > at java.lang.Thread.run(Thread.java:745) > at > org.apache.phoenix.util.ServerUtil.parseServerException(ServerUtil.java:111) > at > org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:127) > at > org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:108) > at > org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:103) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > org.apache.phoenix.job.JobManager$InstrumentedJobFutureTask.run(JobManager.java:183) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: > org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for > joinId: %�����2. The cache might have expired and have been removed. > at > org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98) > at > org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218) > at > org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201) > at > org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203) > at > org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517) > at > org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592) > at > org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556) > at > org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198) > at > org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178) > at > org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925) > at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031) > at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108) > at > org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116) > at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96) > at java.lang.Thread.run(Thread.java:745) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) > at > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95) > at > org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:304) > at > org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:316) > at > org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:164) > at > org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:59) > at > org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114) > at > org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90) > at > org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:283) > at > org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:188) > at > org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:183) > at > org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:110) > at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:739) > at > org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:123) > ... 7 more > {code} > I write a unit test to reproduce this error, just as follows: > {code:borderStyle=solid} > public void testSaltTableJoinError() throws Exception > { > //1.create LHS SALT_TEST salted table > this.jdbcTemplate.update("drop table if exists SALT_TEST "); > this.jdbcTemplate.update( > "create table SALT_TEST"+ > "("+ > "id UNSIGNED_INT not null primary key,"+ > "appId VARCHAR"+ > ")SALT_BUCKETS=2"); > > this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) > values(1,'app1')"); > this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) > values(2,'app2')"); > this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) > values(3,'app3')"); > this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) > values(4,'app4')"); > this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) > values(5,'app5')"); > this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) > values(6,'app6')"); > > //2.split SALT_TEST at rowkey3,i.e.,split the first region > byte[] id3=Bytes.toBytes(3); > byte[] rowKey3=new byte[1+4]; > System.arraycopy(id3, 0, rowKey3, 1, 4); > byte salt3=SaltingUtil.getSaltingByte(rowKey3, 1, > rowKey3.length-1, 2); > rowKey3[0]=salt3; > HBaseAdmin hbaseAdmin=this.getHBaseAdmin(); > hbaseAdmin.split(Bytes.toBytes("SALT_TEST"), rowKey3); > > //3.wait the SALT_TEST split complele > > while(hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).size() < 3) > { > Thread.sleep(1000); > } > //4.we should make sure region0 and region1 is not on same > region server,not share the GlobalCache > HRegionInfo > region1Info=hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).get(1); > String region1EncodedName=region1Info.getEncodedName(); > System.out.println(region1EncodedName); > hbaseAdmin.move(Bytes.toBytes(region1EncodedName), null); > > > //5.create RHS RIGHT_TEST table > this.jdbcTemplate.update("drop table if exists RIGHT_TEST "); > this.jdbcTemplate.update( > "create table RIGHT_TEST"+ > "("+ > "appId VARCHAR not null primary key,"+ > "createTime VARCHAR"+ > ")"); > this.jdbcTemplate.update("upsert into > RIGHT_TEST(appId,createTime) values('app2','201601')"); > this.jdbcTemplate.update("upsert into > RIGHT_TEST(appId,createTime) values('app3','201602')"); > this.jdbcTemplate.update("upsert into > RIGHT_TEST(appId,createTime) values('app4','201603')"); > this.jdbcTemplate.update("upsert into > RIGHT_TEST(appId,createTime) values('app5','201604')"); > //6.clear SALT_TEST's TableRegionCache,let join know > SALT_TEST's newest 3 region > ((PhoenixConnection)this.dataSource.getConnection()) > > .getQueryServices().clearTableRegionCache(Bytes.toBytes("SALT_TEST")); > > //7. join the salted table SALT_TEST with RIGHT_TEST,throw > exception > String sql="select * from SALT_TEST a inner join RIGHT_TEST b > on a.appId=b.appId where a.id>=3 and a.id<=5"; > List<Map<String,Object>> > result=this.jdbcTemplate.queryForList(sql, new Object[0]); > assertTrue(result.size()==3); > } > {code} > I debug the source code,and find something error in the addServerCache > method of org.apache.phoenix.cache.ServerCacheClient class, just as the > following code. In line 166, we get all three regions of SALT_TEST table, but > in line 176,the keyRanges is [[[\x00], [[\x00\x00\x00\x03 - > \x00\x00\x00\x05]]]], so only the second region > [\x00\x00\x00\x00\x03,\x01\x00\x00\x00\x00) can pass the keyRanges's > intersects method in line 176. As a result, the RHS is only sent to the > second region,However,we know SALT_TEST is a salted table, the third region > [\x01\x00\x00\x00\x00,+∞) also has records matching the where condition,and > the RHS should also be sent to the third region. Therefore, the correct > keyRanges should be [[[\x00-\x01], [[\x00\x00\x00\x03 - > \x00\x00\x00\x05]]]],not [[[\x00], [[\x00\x00\x00\x03 - \x00\x00\x00\x05]]]] > {code:borderStyle=solid} > 164 try { > 165 final PTable cacheUsingTable = cacheUsingTableRef.getTable(); > 166 List<HRegionLocation> locations = > services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes()); > 167 int nRegions = locations.size(); > 168 // Size these based on worst case > 169 futures = new ArrayList<Future<Boolean>>(nRegions); > 170 Set<HRegionLocation> servers = new > HashSet<HRegionLocation>(nRegions); > 171 for (HRegionLocation entry : locations) { > 172 // Keep track of servers we've sent to and only send once > 173 byte[] regionStartKey = > entry.getRegionInfo().getStartKey(); > 174 byte[] regionEndKey = entry.getRegionInfo().getEndKey(); > 175 if ( ! servers.contains(entry) && > 176 keyRanges.intersects(regionStartKey, regionEndKey, > 177 cacheUsingTable.getIndexType() == > IndexType.LOCAL ? > 178 > ScanUtil.getRowKeyOffset(regionStartKey, regionEndKey) : 0, true)) { > 179 // Call RPC once per server > 180 servers.add(entry); > 181 if (LOG.isDebugEnabled()) > {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, > connection));} > 182 final byte[] key = entry.getRegionInfo().getStartKey(); > 183 final HTableInterface htable = > services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes()); > 184 closeables.add(htable); > 185 futures.add(executor.submit(new JobCallable<Boolean>() { > 186 > 187 @Override > 188 public Boolean call() throws Exception { > 189 final Map<byte[], AddServerCacheResponse> > results; > 190 try { > 191 results = > htable.coprocessorService(ServerCachingService.class, key, key, > {code} > I think the incorrect keyRanges problem is caused by the following code in > org.apache.phoenix.compile.ScanRanges's ctor (around the 178 line), In my > opinion ,the if statement should be if (isSalted && !isPointLookup). > {code:borderStyle=solid} > if (useSkipScanFilter && isSalted && !isPointLookup) { > ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum)); > } > {code} > Another problem is why when the salted table's first region is not split , > the join sql can execute normally? I think it is caused by a subtle code > error in the addServerCache method of > org.apache.phoenix.cache.ServerCacheClient class: > {code:borderStyle=solid} > 171 for (HRegionLocation entry : locations) { > 172 // Keep track of servers we've sent to and only send once > 173 byte[] regionStartKey = > entry.getRegionInfo().getStartKey(); > 174 byte[] regionEndKey = entry.getRegionInfo().getEndKey(); > 175 if ( ! servers.contains(entry) && > 176 keyRanges.intersects(regionStartKey, regionEndKey, > 177 cacheUsingTable.getIndexType() == > IndexType.LOCAL ? > 178 > ScanUtil.getRowKeyOffset(regionStartKey, regionEndKey) : 0, true)) { > 179 // Call RPC once per server > 180 servers.add(entry); > 181 if (LOG.isDebugEnabled()) > {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, > connection));} > 182 final byte[] key = entry.getRegionInfo().getStartKey(); > 183 final HTableInterface htable = > services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes()); > 184 closeables.add(htable); > 185 futures.add(executor.submit(new JobCallable<Boolean>() { > 186 > 187 @Override > 188 public Boolean call() throws Exception { > 189 final Map<byte[], AddServerCacheResponse> > results; > 190 try { > 191 results = > htable.coprocessorService(ServerCachingService.class, key, key, > {code} > In the above code, if the first region is not split, the first region > (-∞,\x01\x00\x00\x00\x00) can always pass the if test in line 176, and the > startKey of the first region is a byte[0](empty array), so in line 191, when > we invoke the htable's coprocessorService method, the startKey and endKey > parameters are both byte[0]. > However, in the following org.apache.hadoop.hbase.client.HTable's > getKeysAndRegionsInRange method, when the endKey parameter equals > HConstants.EMPTY_END_ROW(which is also byte[0]), the bool variable > endKeyIsEndOfTable is set to true. This causes all regions would be returned, > which obviously is not as expected. Eventually, the RHS is sent to LHS 's > all regions,and the join SQL certainly run normally : > {code:borderStyle=solid} > private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( > final byte[] startKey, final byte[] endKey, final boolean includeEndKey, > final boolean reload) throws IOException { > final boolean endKeyIsEndOfTable = > Bytes.equals(endKey,HConstants.EMPTY_END_ROW); > if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { > throw new IllegalArgumentException( > "Invalid range: " + Bytes.toStringBinary(startKey) + > " > " + Bytes.toStringBinary(endKey)); > } > List<byte[]> keysInRange = new ArrayList<byte[]>(); > List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>(); > byte[] currentKey = startKey; > do { > HRegionLocation regionLocation = getRegionLocation(currentKey, reload); > keysInRange.add(currentKey); > regionsInRange.add(regionLocation); > currentKey = regionLocation.getRegionInfo().getEndKey(); > } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) > && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 > || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))); > return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange, > regionsInRange); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)