Repository: ambari
Updated Branches:
  refs/heads/branch-2.6 eb387261c -> 3b8b8d366


AMBARI-22061. Solr Data Manager script should provide non-destructive archive 
download option (mgergely)

Change-Id: I64546727aec5e1ad034edc2de1d94fb8fc319623


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3b8b8d36
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3b8b8d36
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3b8b8d36

Branch: refs/heads/branch-2.6
Commit: 3b8b8d366c54f11078640a602e36f169940b5b41
Parents: eb38726
Author: Miklos Gergely <mgerg...@hortonworks.com>
Authored: Wed Sep 27 11:18:08 2017 +0200
Committer: Miklos Gergely <mgerg...@hortonworks.com>
Committed: Wed Sep 27 11:18:28 2017 +0200

----------------------------------------------------------------------
 .../src/main/python/solrDataManager.py          | 76 +++++++++++---------
 1 file changed, 43 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/3b8b8d36/ambari-logsearch/ambari-logsearch-solr-client/src/main/python/solrDataManager.py
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-solr-client/src/main/python/solrDataManager.py
 
b/ambari-logsearch/ambari-logsearch-solr-client/src/main/python/solrDataManager.py
index e0356bb..2675bd9 100644
--- 
a/ambari-logsearch/ambari-logsearch-solr-client/src/main/python/solrDataManager.py
+++ 
b/ambari-logsearch/ambari-logsearch-solr-client/src/main/python/solrDataManager.py
@@ -47,7 +47,7 @@ verbose = False
 def parse_arguments():
   parser = optparse.OptionParser("usage: %prog [options]", version="Solr Data 
Manager {0}".format(VERSION))
 
-  parser.add_option("-m", "--mode", dest="mode", type="string", help="delete | 
save")
+  parser.add_option("-m", "--mode", dest="mode", type="string", help="archive 
| delete | save")
   parser.add_option("-s", "--solr-url", dest="solr_url", type="string", 
help="the url of the solr server including the port")
   parser.add_option("-c", "--collection", dest="collection", type="string", 
help="the name of the solr collection")
   parser.add_option("-f", "--filter-field", dest="filter_field", 
type="string", help="the name of the field to filter on")
@@ -98,14 +98,14 @@ def parse_arguments():
       parser.print_help()
       sys.exit()
   
-  mode_values = ["delete", "save"]
+  mode_values = ["archive", "delete", "save"]
   if options.mode not in mode_values:
     print "mode must be one of {0}".format(" | ".join(mode_values))
     parser.print_help()
     sys.exit()
 
   if options.mode == "delete":
-    for r in ["name", "compression", "hdfs_keytab", "hdfs_principal", 
"hdfs_user", "hdfs_path", "key_file_path", "bucket", "key_prefix", 
"local_path"]:
+    for r in ["name", "hdfs_keytab", "hdfs_principal", "hdfs_user", 
"hdfs_path", "key_file_path", "bucket", "key_prefix", "local_path"]:
       if options.__dict__[r] is not None:
         print "argument '{0}' may not be specified in delete mode".format(r)
         parser.print_help()
@@ -153,7 +153,7 @@ def parse_arguments():
     parser.print_help()
     sys.exit()
 
-  if options.mode == "save":
+  if options.mode in ["archive", "save"]:
     count = (1 if is_any_hdfs_property else 0) + (1 if is_any_s3_property else 
0) + \
             (1 if options.__dict__["local_path"] is not None else 0)
     if count != 1:
@@ -171,7 +171,7 @@ def parse_arguments():
   print("  solr-url: " + options.solr_url)
   print("  collection: " + options.collection)
   print("  filter-field: " + options.filter_field)
-  if options.mode == "save":
+  if options.mode in ["archive", "save"]:
     print("  id-field: " + options.id_field)
   if options.__dict__["end"] is not None:
     print("  end: " + options.end)
@@ -182,14 +182,14 @@ def parse_arguments():
     print("  additional-filter: " + str(options.additional_filter))
   if options.__dict__["name"] is not None:
     print("  name: " + str(options.name))
-  if options.mode == "save":
+  if options.mode in ["archive", "save"]:
     print("  read-block-size: " + str(options.read_block_size))
     print("  write-block-size: " + str(options.write_block_size))
     print("  ignore-unfinished-uploading: " + 
str(options.ignore_unfinished_uploading))
   if (options.__dict__["solr_keytab"] is not None):
     print("  solr-keytab: " + options.solr_keytab)
     print("  solr-principal: " + options.solr_principal)
-  if options.mode == "save":
+  if options.mode in ["archive", "save"]:
     print("  output: " + ("json" if options.json_file else 
"line-delimited-json"))
     print("  compression: " + options.compression)
   if (options.__dict__["hdfs_keytab"] is not None):
@@ -251,7 +251,7 @@ def delete(solr_url, collection, filter_field, end, 
solr_keytab, solr_principal)
   
   query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, 
delete_command), "Deleting")
 
-def save(solr_url, collection, filter_field, id_field, range_end, 
read_block_size, write_block_size,
+def save(mode, solr_url, collection, filter_field, id_field, range_end, 
read_block_size, write_block_size,
          ignore_unfinished_uploading, additional_filter, name, solr_keytab, 
solr_principal, json_file,
          compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, 
key_file_path, bucket, key_prefix, local_path):
   solr_kinit_command = None
@@ -269,9 +269,11 @@ def save(solr_url, collection, filter_field, id_field, 
range_end, read_block_siz
     ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path)
 
   working_dir = get_working_dir(solr_url, collection)
-  handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, 
curl_prefix, working_dir, ignore_unfinished_uploading)
-  save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, 
collection, filter_field, id_field, range_end,
-            read_block_size, write_block_size, working_dir, additional_filter, 
name, json_file, compression,
+  if mode == "archive":
+    handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, 
curl_prefix, working_dir, ignore_unfinished_uploading)
+  
+  save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, 
solr_url, collection, filter_field, id_field,
+            range_end, read_block_size, write_block_size, working_dir, 
additional_filter, name, json_file, compression,
             hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, 
local_path)
 
 def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
@@ -341,7 +343,7 @@ def handle_unfinished_uploading(solr_kinit_command, 
hdfs_kinit_command, curl_pre
     
     os.remove(command_json_path)
 
-def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, 
collection, filter_field, id_field,
+def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, 
solr_url, collection, filter_field, id_field,
               range_end, read_block_size, write_block_size, working_dir, 
additional_filter, name, json_file,
               compression, hdfs_user, hdfs_path, key_file_path, bucket, 
key_prefix, local_path):
   logger.info("Starting to save data")
@@ -370,9 +372,9 @@ def save_data(solr_kinit_command, hdfs_kinit_command, 
curl_prefix, solr_url, col
     prev_lot_end_id = results[3]
     
     if records > 0:
-      upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, 
solr_url, collection, filter_field, id_field,
-                   working_dir, tmp_file_path, name, prev_lot_end_value, 
prev_lot_end_id, hdfs_user, hdfs_path,
-                   key_file_path, bucket, key_prefix, local_path, compression)
+      upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, 
solr_url, collection, filter_field,
+                   id_field, working_dir, tmp_file_path, name, 
prev_lot_end_value, prev_lot_end_id, hdfs_user,
+                   hdfs_path, key_file_path, bucket, key_prefix, local_path, 
compression)
       total_records += records
       logger.info("A total of %d records are saved", total_records)
 
@@ -441,8 +443,8 @@ def finish_file(tmp_file, json_file):
   if json_file:
     tmp_file.write("\n}")
 
-def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, 
solr_url, collection, filter_field, id_field,
-                 working_dir, tmp_file_path, name, prev_lot_end_value, 
prev_lot_end_id, hdfs_user, hdfs_path,
+def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, 
solr_url, collection, filter_field,
+                 id_field, working_dir, tmp_file_path, name, 
prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
                  key_file_path, bucket, key_prefix, local_path, compression):
   if name:
     file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, 
prev_lot_end_value, prev_lot_end_id).replace(':', '_')
@@ -451,9 +453,9 @@ def upload_block(solr_kinit_command, hdfs_kinit_command, 
curl_prefix, solr_url,
   
   upload_file_path = compress_file(working_dir, tmp_file_path, file_name, 
compression)
   
-  upload_command = create_command_file(True, working_dir, upload_file_path, 
solr_url, collection, filter_field, id_field,
-                                       prev_lot_end_value, prev_lot_end_id, 
hdfs_user, hdfs_path, key_file_path, bucket,
-                                       key_prefix, local_path)
+  upload_command = create_command_file(mode, True, working_dir, 
upload_file_path, solr_url, collection, filter_field,
+                                       id_field, prev_lot_end_value, 
prev_lot_end_id, hdfs_user, hdfs_path,
+                                       key_file_path, bucket, key_prefix, 
local_path)
   if hdfs_user:
     upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, 
hdfs_path, hdfs_user)
   elif key_file_path:
@@ -464,11 +466,12 @@ def upload_block(solr_kinit_command, hdfs_kinit_command, 
curl_prefix, solr_url,
     logger.warn("Unknown upload destination")
     sys.exit()
   
-  delete_command = create_command_file(False, working_dir, upload_file_path, 
solr_url, collection, filter_field, id_field,
-                                       prev_lot_end_value, prev_lot_end_id, 
None, None, None, None, None, None)
-  delete_data(solr_kinit_command, curl_prefix, delete_command, collection, 
filter_field, id_field, prev_lot_end_value, prev_lot_end_id)
-  
-  os.remove("{0}/command.json".format(working_dir))
+  delete_command = create_command_file(mode, False, working_dir, 
upload_file_path, solr_url, collection, filter_field,
+                                       id_field, prev_lot_end_value, 
prev_lot_end_id, None, None, None, None, None, None)
+  if mode == "archive":
+    delete_data(solr_kinit_command, curl_prefix, delete_command, collection, 
filter_field, id_field, prev_lot_end_value,
+                prev_lot_end_id)
+    os.remove("{0}/command.json".format(working_dir))
 
 def compress_file(working_dir, tmp_file_path, file_name, compression):
   data_file_name = "{0}.json".format(file_name)
@@ -511,8 +514,9 @@ def compress_file(working_dir, tmp_file_path, file_name, 
compression):
   
   return upload_file_path
 
-def create_command_file(upload, working_dir, upload_file_path, solr_url, 
collection, filter_field, id_field, prev_lot_end_value,
-                        prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, 
bucket, key_prefix, local_path):
+def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, 
collection, filter_field, id_field,
+                        prev_lot_end_value, prev_lot_end_id, hdfs_user, 
hdfs_path, key_file_path, bucket, key_prefix,
+                        local_path):
   commands = {}
   
   if upload:
@@ -551,6 +555,9 @@ def create_command_file(upload, working_dir, 
upload_file_path, solr_url, collect
     else:
       logger.warn("Unknown upload destination")
       sys.exit()
+    
+    if mode == "save":
+      return upload_command
 
   
   delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
@@ -558,6 +565,9 @@ def create_command_file(upload, working_dir, 
upload_file_path, solr_url, collect
   delete_query = quote("{0}+OR+{1}".format(delete_prev, delete_last), 
safe="/+\"*")
   delete_command = 
"{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json"
 \
     .format(solr_url, collection, delete_query)
+  if mode == "save":
+    return delete_command
+  
   delete_command_data = {}
   delete_command_data["command"] = delete_command
   delete_command_data["collection"] = collection
@@ -710,12 +720,12 @@ if __name__ == '__main__':
     
     if options.mode == "delete":
       delete(options.solr_url, options.collection, options.filter_field, end, 
options.solr_keytab, options.solr_principal)
-    elif options.mode == "save":
-      save(options.solr_url, options.collection, options.filter_field, 
options.id_field, end, options.read_block_size,
-           options.write_block_size, options.ignore_unfinished_uploading, 
options.additional_filter, options.name,
-           options.solr_keytab, options.solr_principal, options.json_file, 
options.compression,
-           options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, 
options.hdfs_path, options.key_file_path,
-           options.bucket, options.key_prefix, options.local_path)
+    elif options.mode in ["archive", "save"]:
+      save(options.mode, options.solr_url, options.collection, 
options.filter_field, options.id_field, end,
+           options.read_block_size, options.write_block_size, 
options.ignore_unfinished_uploading,
+           options.additional_filter, options.name, options.solr_keytab, 
options.solr_principal, options.json_file,
+           options.compression, options.hdfs_keytab, options.hdfs_principal, 
options.hdfs_user, options.hdfs_path,
+           options.key_file_path, options.bucket, options.key_prefix, 
options.local_path)
     else:
       logger.warn("Unknown mode: %s", options.mode)
     

Reply via email to