This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 8d2a53fcd7711a843b3f34eae5953a8bfff3d3be Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Tue Mar 10 15:29:11 2020 +0800 KYLIN-4414 bulkload needs to follow locality --- .../kylin/storage/hbase/steps/CubeHFileJob.java | 15 ++- .../storage/hbase/steps/HFileOutputFormat3.java | 126 ++++++++++++++++++++- 2 files changed, 131 insertions(+), 10 deletions(-) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index c0fae42..e403c20 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -18,8 +18,11 @@ package org.apache.kylin.storage.hbase.steps; +import static org.apache.hadoop.hbase.HBaseConfiguration.merge; + import java.io.IOException; import java.util.Collection; +import java.util.Locale; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; @@ -47,8 +50,6 @@ import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.hbase.HBaseConfiguration.merge; - /** * @author George Song (ysong1) */ @@ -77,7 +78,7 @@ public class CubeHFileJob extends AbstractHadoopJob { CubeInstance cube = cubeMgr.getCube(cubeName); - // use current hbase configuration + // construct configuration for the MR job cluster Configuration configuration = new Configuration(HBaseConnection.getCurrentHBaseConfiguration()); String[] allServices = getAllServices(configuration); merge(configuration, getConf()); @@ -95,10 +96,14 @@ public class CubeHFileJob extends AbstractHadoopJob { // add metadata to distributed cache attachCubeMetadata(cube, job.getConfiguration()); - HTable htable = new HTable(configuration, getOptionValue(OPTION_HTABLE_NAME)); + // construct configuration for the HBase cluster + Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration(); + HTable htable = new HTable(hbaseConf, getOptionValue(OPTION_HTABLE_NAME).toUpperCase(Locale.ROOT)); // Automatic config ! HFileOutputFormat3.configureIncrementalLoad(job, htable); + HFileOutputFormat3.configureHConnection(job, hbaseConf, getJobTempDir()); + reconfigurePartitions(configuration, partitionFilePath); job.setInputFormatClass(SequenceFileInputFormat.class); @@ -109,7 +114,7 @@ public class CubeHFileJob extends AbstractHadoopJob { job.setSortComparatorClass(RowKeyWritable.RowKeyComparator.class); // set block replication to 3 for hfiles - configuration.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); + job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); this.deletePath(job.getConfiguration(), output); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java index 12c30ea..e14d012 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java @@ -17,8 +17,11 @@ */ package org.apache.kylin.storage.hbase.steps; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.net.InetSocketAddress; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -39,15 +42,20 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; @@ -82,6 +90,7 @@ import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.kylin.common.util.RandomUtil; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; /** * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, with fix attempt on KYLIN-2788 @@ -114,6 +123,15 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, // override the auto-detection of datablock encoding. public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + /** + * Keep locality while generating HFiles for bulkload. See HBASE-12596 + */ + public static final String LOCALITY_SENSITIVE_CONF_KEY = "hbase.bulkload.locality.sensitive.enabled"; + private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; + private static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; + + private static final String BULKLOAD_HCONNECTION_CONF_KEY = "hbase.bulkload.hconnection.configuration"; + @Override public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { @@ -150,6 +168,8 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, overriddenEncoding = null; } + final Configuration hConnectionConf = getConfigureHConnection(conf); + return new RecordWriter<ImmutableBytesWritable, V>() { // Map of families to writers and how much has been output on the writer. private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR); @@ -178,7 +198,47 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, rollWriters(); } if (wl == null || wl.writer == null) { - wl = getNewWriter(family, conf); + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + HRegionLocation loc = null; + String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); + if (tableName != null) { + try (Connection connection = ConnectionFactory.createConnection(hConnectionConf); + RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { + loc = locator.getRegionLocation(rowKey); + } catch (Throwable e) { + LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey), e); + loc = null; + } + } + + if (null == loc) { + if (LOG.isTraceEnabled()) { + LOG.trace("failed to get region location, so use default writer: " + + Bytes.toString(rowKey)); + } + wl = getNewWriter(family, conf, null); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); + } + InetSocketAddress initialIsa = + new InetSocketAddress(loc.getHostname(), loc.getPort()); + if (initialIsa.isUnresolved()) { + if (LOG.isTraceEnabled()) { + LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" + + loc.getPort() + ", so use default writer"); + } + wl = getNewWriter(family, conf, null); + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); + } + wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa }); + } + } + } else { + wl = getNewWriter(family, conf, null); + } } kv.updateLatestStamp(this.now); wl.writer.append(kv); @@ -199,7 +259,8 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", justification = "Not important") - private WriterLength getNewWriter(byte[] family, Configuration conf) throws IOException { + private WriterLength getNewWriter(byte[] family, Configuration conf, InetSocketAddress[] favoredNodes) + throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); Algorithm compression = compressionMap.get(family); @@ -219,9 +280,15 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, contextBuilder.withDataBlockEncoding(encoding); HFileContext hFileContext = contextBuilder.build(); - wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs).withOutputDir(familydir) - .withBloomType(bloomType).withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext) - .build(); + if (null == favoredNodes) { + wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR) + .withFileContext(hFileContext).build(); + } else { + wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR) + .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build(); + } this.writers.put(family, wl); return wl; @@ -307,6 +374,49 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, } } + public static File configureHConnection(Job job, Configuration hConnectionConf, File tempDir) throws IOException { + File tempFile = new File(tempDir, "HConfiguration-" + System.currentTimeMillis() + ".xml"); + tempFile.deleteOnExit(); + + FileOutputStream os = new FileOutputStream(tempFile); + hConnectionConf.writeXml(os); + os.close(); + + String tmpFiles = job.getConfiguration().get("tmpfiles", null); + if (tmpFiles == null) { + tmpFiles = fixWindowsPath("file://" + tempFile.getAbsolutePath()); + } else { + tmpFiles += "," + fixWindowsPath("file://" + tempFile.getAbsolutePath()); + } + job.getConfiguration().set("tmpfiles", tmpFiles); + LOG.info("A temporary file " + tempFile.getAbsolutePath() + + " is created for storing hconnection related configuration!!!"); + + job.getConfiguration().set(BULKLOAD_HCONNECTION_CONF_KEY, tempFile.getName()); + return tempFile; + } + + public static Configuration getConfigureHConnection(Configuration jobConf) { + if (Strings.isNullOrEmpty(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY))) { + return jobConf; + } + File tempFile = new File(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY)); + Configuration hConnectionConf = new Configuration(false); + hConnectionConf.addResource(new Path(tempFile.toURI())); + return hConnectionConf; + } + + public static String fixWindowsPath(String path) { + // fix windows path + if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) { + path = path.replace("file://", "file:///"); + } + if (path.startsWith("file:///")) { + path = path.replace('\\', '/'); + } + return path; + } + /** * Configure a MapReduce Job to perform an incremental load into the given * table. This @@ -388,6 +498,12 @@ public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + // record this table name for creating writer by favored nodes + LOG.info("bulkload locality sensitive enabled"); + conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString()); + } + // Use table's region boundaries for TOP split points. LOG.info("Looking up current regions for table " + tableDescriptor.getTableName()); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);