Repository: ambari Updated Branches: refs/heads/branch-2.6 eb387261c -> 3b8b8d366
AMBARI-22061. Solr Data Manager script should provide non-destructive archive download option (mgergely) Change-Id: I64546727aec5e1ad034edc2de1d94fb8fc319623 Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3b8b8d36 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3b8b8d36 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3b8b8d36 Branch: refs/heads/branch-2.6 Commit: 3b8b8d366c54f11078640a602e36f169940b5b41 Parents: eb38726 Author: Miklos Gergely <mgerg...@hortonworks.com> Authored: Wed Sep 27 11:18:08 2017 +0200 Committer: Miklos Gergely <mgerg...@hortonworks.com> Committed: Wed Sep 27 11:18:28 2017 +0200 ---------------------------------------------------------------------- .../src/main/python/solrDataManager.py | 76 +++++++++++--------- 1 file changed, 43 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/3b8b8d36/ambari-logsearch/ambari-logsearch-solr-client/src/main/python/solrDataManager.py ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-solr-client/src/main/python/solrDataManager.py b/ambari-logsearch/ambari-logsearch-solr-client/src/main/python/solrDataManager.py index e0356bb..2675bd9 100644 --- a/ambari-logsearch/ambari-logsearch-solr-client/src/main/python/solrDataManager.py +++ b/ambari-logsearch/ambari-logsearch-solr-client/src/main/python/solrDataManager.py @@ -47,7 +47,7 @@ verbose = False def parse_arguments(): parser = optparse.OptionParser("usage: %prog [options]", version="Solr Data Manager {0}".format(VERSION)) - parser.add_option("-m", "--mode", dest="mode", type="string", help="delete | save") + parser.add_option("-m", "--mode", dest="mode", type="string", help="archive | delete | save") parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port") parser.add_option("-c", "--collection", dest="collection", type="string", help="the name of the solr collection") parser.add_option("-f", "--filter-field", dest="filter_field", type="string", help="the name of the field to filter on") @@ -98,14 +98,14 @@ def parse_arguments(): parser.print_help() sys.exit() - mode_values = ["delete", "save"] + mode_values = ["archive", "delete", "save"] if options.mode not in mode_values: print "mode must be one of {0}".format(" | ".join(mode_values)) parser.print_help() sys.exit() if options.mode == "delete": - for r in ["name", "compression", "hdfs_keytab", "hdfs_principal", "hdfs_user", "hdfs_path", "key_file_path", "bucket", "key_prefix", "local_path"]: + for r in ["name", "hdfs_keytab", "hdfs_principal", "hdfs_user", "hdfs_path", "key_file_path", "bucket", "key_prefix", "local_path"]: if options.__dict__[r] is not None: print "argument '{0}' may not be specified in delete mode".format(r) parser.print_help() @@ -153,7 +153,7 @@ def parse_arguments(): parser.print_help() sys.exit() - if options.mode == "save": + if options.mode in ["archive", "save"]: count = (1 if is_any_hdfs_property else 0) + (1 if is_any_s3_property else 0) + \ (1 if options.__dict__["local_path"] is not None else 0) if count != 1: @@ -171,7 +171,7 @@ def parse_arguments(): print(" solr-url: " + options.solr_url) print(" collection: " + options.collection) print(" filter-field: " + options.filter_field) - if options.mode == "save": + if options.mode in ["archive", "save"]: print(" id-field: " + options.id_field) if options.__dict__["end"] is not None: print(" end: " + options.end) @@ -182,14 +182,14 @@ def parse_arguments(): print(" additional-filter: " + str(options.additional_filter)) if options.__dict__["name"] is not None: print(" name: " + str(options.name)) - if options.mode == "save": + if options.mode in ["archive", "save"]: print(" read-block-size: " + str(options.read_block_size)) print(" write-block-size: " + str(options.write_block_size)) print(" ignore-unfinished-uploading: " + str(options.ignore_unfinished_uploading)) if (options.__dict__["solr_keytab"] is not None): print(" solr-keytab: " + options.solr_keytab) print(" solr-principal: " + options.solr_principal) - if options.mode == "save": + if options.mode in ["archive", "save"]: print(" output: " + ("json" if options.json_file else "line-delimited-json")) print(" compression: " + options.compression) if (options.__dict__["hdfs_keytab"] is not None): @@ -251,7 +251,7 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal) query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting") -def save(solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size, +def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size, ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file, compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path): solr_kinit_command = None @@ -269,9 +269,11 @@ def save(solr_url, collection, filter_field, id_field, range_end, read_block_siz ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path) working_dir = get_working_dir(solr_url, collection) - handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading) - save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, range_end, - read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression, + if mode == "archive": + handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading) + + save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, + range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path) def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path): @@ -341,7 +343,7 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre os.remove(command_json_path) -def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, +def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path): logger.info("Starting to save data") @@ -370,9 +372,9 @@ def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, col prev_lot_end_id = results[3] if records > 0: - upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, - working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, - key_file_path, bucket, key_prefix, local_path, compression) + upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, + id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, + hdfs_path, key_file_path, bucket, key_prefix, local_path, compression) total_records += records logger.info("A total of %d records are saved", total_records) @@ -441,8 +443,8 @@ def finish_file(tmp_file, json_file): if json_file: tmp_file.write("\n}") -def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, - working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, +def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, + id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, compression): if name: file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_') @@ -451,9 +453,9 @@ def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, upload_file_path = compress_file(working_dir, tmp_file_path, file_name, compression) - upload_command = create_command_file(True, working_dir, upload_file_path, solr_url, collection, filter_field, id_field, - prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, - key_prefix, local_path) + upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field, + id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, + key_file_path, bucket, key_prefix, local_path) if hdfs_user: upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, hdfs_path, hdfs_user) elif key_file_path: @@ -464,11 +466,12 @@ def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, logger.warn("Unknown upload destination") sys.exit() - delete_command = create_command_file(False, working_dir, upload_file_path, solr_url, collection, filter_field, id_field, - prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None) - delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value, prev_lot_end_id) - - os.remove("{0}/command.json".format(working_dir)) + delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field, + id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None) + if mode == "archive": + delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value, + prev_lot_end_id) + os.remove("{0}/command.json".format(working_dir)) def compress_file(working_dir, tmp_file_path, file_name, compression): data_file_name = "{0}.json".format(file_name) @@ -511,8 +514,9 @@ def compress_file(working_dir, tmp_file_path, file_name, compression): return upload_file_path -def create_command_file(upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field, prev_lot_end_value, - prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path): +def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field, + prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, + local_path): commands = {} if upload: @@ -551,6 +555,9 @@ def create_command_file(upload, working_dir, upload_file_path, solr_url, collect else: logger.warn("Unknown upload destination") sys.exit() + + if mode == "save": + return upload_command delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value) @@ -558,6 +565,9 @@ def create_command_file(upload, working_dir, upload_file_path, solr_url, collect delete_query = quote("{0}+OR+{1}".format(delete_prev, delete_last), safe="/+\"*") delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \ .format(solr_url, collection, delete_query) + if mode == "save": + return delete_command + delete_command_data = {} delete_command_data["command"] = delete_command delete_command_data["collection"] = collection @@ -710,12 +720,12 @@ if __name__ == '__main__': if options.mode == "delete": delete(options.solr_url, options.collection, options.filter_field, end, options.solr_keytab, options.solr_principal) - elif options.mode == "save": - save(options.solr_url, options.collection, options.filter_field, options.id_field, end, options.read_block_size, - options.write_block_size, options.ignore_unfinished_uploading, options.additional_filter, options.name, - options.solr_keytab, options.solr_principal, options.json_file, options.compression, - options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path, options.key_file_path, - options.bucket, options.key_prefix, options.local_path) + elif options.mode in ["archive", "save"]: + save(options.mode, options.solr_url, options.collection, options.filter_field, options.id_field, end, + options.read_block_size, options.write_block_size, options.ignore_unfinished_uploading, + options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file, + options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path, + options.key_file_path, options.bucket, options.key_prefix, options.local_path) else: logger.warn("Unknown mode: %s", options.mode)