[ https://issues.apache.org/jira/browse/HBASE-25608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on HBASE-25608 started by Shinya Yoshida. ---------------------------------------------- > Support HFileOutputFormat locality sensitive even destination cluster is > different from source cluster > ------------------------------------------------------------------------------------------------------ > > Key: HBASE-25608 > URL: https://issues.apache.org/jira/browse/HBASE-25608 > Project: HBase > Issue Type: Improvement > Affects Versions: 3.0.0-alpha-1, 1.7.0, 2.4.1, 1.8.0 > Reporter: Shinya Yoshida > Assignee: Shinya Yoshida > Priority: Major > > Sometimes, we want to perform MR job which is source cluster and destination > cluster is different like following for data migration, batch job and so on. > > {code:java} > Configuration conf = > HBaseConfiguration.createClusterConf(HBaseConfiguration.create(), > sourceClusterKey); > final Job job = Job.getInstance(conf, jobName); > // ... > FileOutputFormat.setOutputPath(job, new Path(outputPath)); > Scan scan = createScanner(); > TableMapReduceUtil.initTableMapperJob( > sourceTableName, scan, > Mapper.class, > ImmutableBytesWritable.class, Put.class, job); > try (Connection con = > ConnectionFactory.createConnection(destinationClusterKey); > Table table = con.getTable(destinationTableName); > RegionLocator regionLocator = > con.getRegionLocator(destinationTableName)) { > HFileOutputFormat2.configureIncrementalLoad(job, table, > regionLocator); > } > return job.waitForCompletion(true) ? 0 : 1; > {code} > HFileOutputFormat2 doesn't create locality-sensitive hfiles. > We got following exception > {code:java} > 2021-02-24 19:55:48,298 WARN [main] > org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2: there's something wrong > when locating rowkey: xxxxxxxxxxxx > org.apache.hadoop.hbase.TableNotFoundException: Table 'table' was not found, > got: XXXX. > at > org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1302) > at > org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1181) > at > org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1165) > at > org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1122) > at > org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getRegionLocation(ConnectionManager.java:957) > at > org.apache.hadoop.hbase.client.HRegionLocator.getRegionLocation(HRegionLocator.java:74) > at > org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:216) > at > org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:167) > at > org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558) > at > org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) > at > org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105) > at > org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:78) > at > org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:43) > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) > at > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) > at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) > {code} > Because it creates connection using task configuration which is configured > for source cluster. > Thus, it tried to connect to the source cluster and get locations for the > table that should exist in the destination. > {code:java} > InetSocketAddress[] favoredNodes = null; > if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, > DEFAULT_LOCALITY_SENSITIVE)) { > HRegionLocation loc = null; > String tableName = Bytes.toString(tableNameBytes); > if (tableName != null) { > try (Connection connection = > ConnectionFactory.createConnection(conf); > RegionLocator locator = > > connection.getRegionLocator(TableName.valueOf(tableName))) { > loc = locator.getRegionLocation(rowKey); > } catch (Throwable e) { > LOG.warn("Something wrong locating rowkey {} in {}", > Bytes.toString(rowKey), > tableName, e); > loc = null; > } > } > if (null == loc) { > LOG.trace("Failed get of location, use default writer {}", > Bytes.toString(rowKey)); > } else { > LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); > InetSocketAddress initialIsa = > new InetSocketAddress(loc.getHostname(), loc.getPort()); > if (initialIsa.isUnresolved()) { > LOG.trace("Failed resolve address {}, use default writer", > loc.getHostnamePort()); > } else { > LOG.debug("Use favored nodes writer: {}", > initialIsa.getHostString()); > favoredNodes = new InetSocketAddress[] { initialIsa }; > } > } > } > wl = getNewWriter(tableNameBytes, family, conf, favoredNodes); > {code} > HFileOutputFormat2 should be aware of destination cluster correctly when > source and destination is different for proper location-sensitive HFile > generation -- This message was sent by Atlassian Jira (v8.3.4#803005)