This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch 3.0.x in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 619e05e8001f6fa0c0927e16eab650ff87787ef9 Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Wed Apr 1 18:32:14 2020 +0800 KYLIN-4385 Fix HiveProducer can not write to Hive Table[AWS Azure] --- .../kylin/metrics/lib/impl/hive/HiveProducer.java | 77 +++++++++++++++++----- 1 file changed, 61 insertions(+), 16 deletions(-) diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java index 5082b6a..ae10a93 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java @@ -66,6 +66,11 @@ public class HiveProducer { private Path curPartitionContentPath; private int id = 0; private FSDataOutputStream fout; + /** + * Some cloud file system, like AWS S3, didn't support append action to exist file. + * When append is not supported, will produce new file in a call to write method. + */ + private final boolean supportAppend; public HiveProducer(String metricType, Properties props) throws Exception { this(metricType, props, new HiveConf()); @@ -96,6 +101,7 @@ public class HiveProducer { HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf); String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond()) .getSd().getLocation(); + logger.debug("Find table location for {} at {}", tableName.getSecond(), tableLocation); List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(), tableName.getSecond()); metaStoreClient.close(); @@ -110,6 +116,9 @@ public class HiveProducer { hostName = "UNKNOWN"; } contentFilePrefix = hostName + "-" + System.currentTimeMillis() + "-part-"; + String fsUri = fs.getUri().toString(); + supportAppend = !fsUri.startsWith("s3") && !fsUri.startsWith("wasb"); // AWS EMR and Azure HDInsight + logger.info("For {}, supportAppend was set to {}", fsUri, supportAppend); } public void close() { @@ -126,7 +135,7 @@ public class HiveProducer { for (Record record : recordList) { HiveProducerRecord hiveRecord = convertTo(record); if (recordMap.get(hiveRecord.key()) == null) { - recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList()); + recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord>newLinkedList()); } recordMap.get(hiveRecord.key()).add(hiveRecord); } @@ -174,17 +183,31 @@ public class HiveProducer { } hql.append(")"); logger.debug("create partition by {}.", hql); - Driver driver = new Driver(hiveConf); - CliSessionState session = new CliSessionState(hiveConf); - SessionState.start(session); - CommandProcessorResponse res = driver.run(hql.toString()); - if (res.getResponseCode() != 0) { - logger.warn("Fail to add partition. HQL: {}; Cause by: {}", - hql.toString(), - res.toString()); + Driver driver = null; + CliSessionState session = null; + try { + driver = new Driver(hiveConf); + session = new CliSessionState(hiveConf); + SessionState.start(session); + CommandProcessorResponse res = driver.run(hql.toString()); + if (res.getResponseCode() != 0) { + logger.warn("Fail to add partition. HQL: {}; Cause by: {}", + hql.toString(), + res.toString()); + } + session.close(); + driver.close(); + } catch (Exception ex) { + // Do not let hive exception stop HiveProducer from writing file, so catch and report it here + logger.error("create partition failed, please create it manually : " + hql, ex); + } finally { + if (session != null) { + session.close(); + } + if (driver != null) { + driver.close(); + } } - session.close(); - driver.close(); } // Step 3: create path for new partition if it is the first time write metrics message or new partition should be used @@ -194,7 +217,21 @@ public class HiveProducer { closeFout(); } - Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%04d", id)); + Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id)); + + // Do not overwrite exist files when supportAppend was set to false + int nCheck = 0; + while (!supportAppend && fs.exists(partitionContentPath)) { + id++; + nCheck++; + partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id)); + logger.debug("{} exists, skip it.", partitionContentPath); + if (nCheck > 100000) { + logger.warn("Exceed max check times."); + break; + } + } + logger.info("Try to use new partition content path: {} for metric: {}", partitionContentPath, metricType); if (!fs.exists(partitionContentPath)) { int nRetry = 0; @@ -209,30 +246,38 @@ public class HiveProducer { "Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries"); } } - fout = fs.append(partitionContentPath); + if (supportAppend) { + fout = fs.append(partitionContentPath); + } else { + fout = fs.create(partitionContentPath); + } prePartitionPath = partitionPath.toString(); curPartitionContentPath = partitionContentPath; - id = (id + 1) % 10; + id = (id + 1) % (supportAppend ? 10 : 100000); } - // Step 4: append record to HDFS without flush + // Step 4: append record to DFS try { int count = 0; for (HiveProducerRecord elem : recordItr) { fout.writeBytes(elem.valueToString() + "\n"); count++; } - logger.info("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath); + logger.debug("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath); } catch (IOException e) { logger.error("Fails to write metrics(" + metricType + ") to file " + curPartitionContentPath.toString() + " due to ", e); closeFout(); } + if (!supportAppend) { + closeFout(); + } } private void closeFout() { if (fout != null) { try { + logger.debug("Flush output stream {}.", curPartitionContentPath); fout.close(); } catch (Exception e) { logger.error("Close the path: " + curPartitionContentPath + " failed", e);