This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 3400338d7870c843e5ab2490c6767aa37e8092e4 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Mon Jun 15 16:41:36 2020 +0800 KYLIN-4573 Add option to indicate whether to close file for every append for Hive Producer (cherry picked from commit 616e06675278a6857f3cbb353a4f9c2243eeccc1) --- .../kylin/metrics/lib/impl/hive/HiveProducer.java | 93 ++++++++++++++++++---- server/src/main/resources/kylinMetrics.xml | 1 + 2 files changed, 77 insertions(+), 17 deletions(-) diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java index a96b261..8bc7a43 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java @@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -66,6 +67,15 @@ public class HiveProducer { private Path curPartitionContentPath; private int id = 0; private FSDataOutputStream fout; + /** + * Some cloud file system, like AWS S3, didn't support append action to exist file. + * When append is not supported, will produce new file in a call to write method. + */ + private final boolean supportAppend; + + private final boolean closeFileEveryAppend; + + private final Map<String, String> kylinSpecifiedConfig = new HashMap<>(); public HiveProducer(String metricType, Properties props) throws Exception { this(metricType, props, new HiveConf()); @@ -75,7 +85,13 @@ public class HiveProducer { this.metricType = metricType; hiveConf = hiveConfig; for (Map.Entry<Object, Object> e : props.entrySet()) { - hiveConf.set(e.getKey().toString(), e.getValue().toString()); + String key = e.getKey().toString(); + String value = e.getValue().toString(); + if (key.startsWith("kylin.")) { + kylinSpecifiedConfig.put(key, value); + } else { + hiveConf.set(key, value); + } } fs = FileSystem.get(hiveConf); @@ -96,6 +112,7 @@ public class HiveProducer { IMetaStoreClient metaStoreClient = HiveMetaStoreClientFactory.getHiveMetaStoreClient(hiveConf); String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond()) .getSd().getLocation(); + logger.debug("Find table location for {} at {}", tableName.getSecond(), tableLocation); List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(), tableName.getSecond()); metaStoreClient.close(); @@ -110,6 +127,12 @@ public class HiveProducer { hostName = "UNKNOWN"; } contentFilePrefix = hostName + "-" + System.currentTimeMillis() + "-part-"; + String fsUri = fs.getUri().toString(); + supportAppend = fsUri.startsWith("hdfs") ; // Only HDFS is appendable + logger.info("For {}, supportAppend was set to {}", fsUri, supportAppend); + + closeFileEveryAppend = !supportAppend + || Boolean.parseBoolean(kylinSpecifiedConfig.get("kylin.hive.producer.close-file-every-append")); } public void close() { @@ -127,7 +150,7 @@ public class HiveProducer { for (Record record : recordList) { HiveProducerRecord hiveRecord = convertTo(record); if (recordMap.get(hiveRecord.key()) == null) { - recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList()); + recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord>newLinkedList()); } recordMap.get(hiveRecord.key()).add(hiveRecord); } @@ -175,17 +198,31 @@ public class HiveProducer { } hql.append(")"); logger.debug("create partition by {}.", hql); - Driver driver = new Driver(hiveConf); - CliSessionState session = new CliSessionState(hiveConf); - SessionState.start(session); - CommandProcessorResponse res = driver.run(hql.toString()); - if (res.getResponseCode() != 0) { - logger.warn("Fail to add partition. HQL: {}; Cause by: {}", - hql.toString(), - res.toString()); + Driver driver = null; + CliSessionState session = null; + try { + driver = new Driver(hiveConf); + session = new CliSessionState(hiveConf); + SessionState.start(session); + CommandProcessorResponse res = driver.run(hql.toString()); + if (res.getResponseCode() != 0) { + logger.warn("Fail to add partition. HQL: {}; Cause by: {}", + hql.toString(), + res.toString()); + } + session.close(); + driver.close(); + } catch (Exception ex) { + // Do not let hive exception stop HiveProducer from writing file, so catch and report it here + logger.error("create partition failed, please create it manually : " + hql, ex); + } finally { + if (session != null) { + session.close(); + } + if (driver != null) { + driver.close(); + } } - session.close(); - driver.close(); } // Step 3: create path for new partition if it is the first time write metrics message or new partition should be used @@ -195,7 +232,21 @@ public class HiveProducer { closeFout(); } - Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%04d", id)); + Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id)); + + // Do not overwrite exist files when supportAppend was set to false + int nCheck = 0; + while (!supportAppend && fs.exists(partitionContentPath)) { + id++; + nCheck++; + partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id)); + logger.debug("{} exists, skip it.", partitionContentPath); + if (nCheck > 100000) { + logger.warn("Exceed max check times."); + break; + } + } + logger.info("Try to use new partition content path: {} for metric: {}", partitionContentPath, metricType); if (!fs.exists(partitionContentPath)) { int nRetry = 0; @@ -210,30 +261,38 @@ public class HiveProducer { "Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries"); } } - fout = fs.append(partitionContentPath); + if (supportAppend) { + fout = fs.append(partitionContentPath); + } else { + fout = fs.create(partitionContentPath); + } prePartitionPath = partitionPath.toString(); curPartitionContentPath = partitionContentPath; - id = (id + 1) % 10; + id = (id + 1) % (supportAppend ? 10 : 100000); } - // Step 4: append record to HDFS without flush + // Step 4: append record to DFS try { int count = 0; for (HiveProducerRecord elem : recordItr) { fout.writeBytes(elem.valueToString() + "\n"); count++; } - logger.info("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath); + logger.debug("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath); } catch (IOException e) { logger.error("Fails to write metrics(" + metricType + ") to file " + curPartitionContentPath.toString() + " due to ", e); closeFout(); } + if (closeFileEveryAppend) { + closeFout(); + } } private void closeFout() { if (fout != null) { try { + logger.debug("Flush output stream {}.", curPartitionContentPath); fout.close(); } catch (Exception e) { logger.error("Close the path: " + curPartitionContentPath + " failed", e); diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml index 843fb91..85c879f 100644 --- a/server/src/main/resources/kylinMetrics.xml +++ b/server/src/main/resources/kylinMetrics.xml @@ -73,6 +73,7 @@ value="org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter"/> <property name="second"> <props> + <prop key="kylin.hive.producer.close-file-every-append">true</prop> </props> </property> </bean>