Github user xunzhang commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/846#discussion_r75244483 --- Diff: tools/bin/hawqregister --- @@ -40,186 +39,195 @@ EXECNAME = os.path.split(__file__)[-1] setup_tool_logging(EXECNAME,getLocalHostname(),getUserName()) -def create_opt_parser(version): +def option_parser(): parser = OptParser(option_class=OptChecker, - usage='usage: %prog [options] database_name table_name file_or_dir_path_in_hdfs', - version=version) + usage='usage: %prog [options] table_name', + version='%prog version $Revision: #1 $') parser.remove_option('-h') parser.add_option('-?', '--help', action='help') - parser.add_option('-h', '--host', help="host of the target DB") - parser.add_option('-p', '--port', help="port of the target DB", type='int', default=0) - parser.add_option('-U', '--user', help="username of the target DB") - return parser - - -def check_hadoop_command(): - hdfscmd = "hadoop" - result = local_ssh(hdfscmd); - if result != 0: - logger.error("command 'hadoop' is not available, please set environment variable $PATH to fix this") + parser.add_option('-h', '--host', help='host of the target DB') + parser.add_option('-p', '--port', help='port of the target DB', type='int', default=0) + parser.add_option('-U', '--user', help='username of the target DB') + parser.add_option('-d', '--database', default = 'postgres', dest = 'database', help='database name') + parser.add_option('-f', '--filepath', dest = 'filepath', help='file name in HDFS') + parser.add_option('-c', '--config', dest = 'yml_config', default = '', help='configuration file in YAML format') + return parser.parse_args() + + +def option_parser_yml(yml_file): + import yaml + with open(yml_file, 'r') as f: + params = yaml.load(f) + if params['FileFormat'] == 'Parquet': + offset = params['Parquet_FileLocations']['Files'][0]['path'].rfind('/') + filepath = params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'][:offset] if len(params['Parquet_FileLocations']['Files']) != 1 else params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'] + return 'Parquet', filepath, params['Parquet_Schema'], params['Distribution_Policy'] + offset = params['AO_FileLocations']['Files'][0]['path'].rfind('/') + filepath = params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'][:offset] if len(params['AO_FileLocations']['Files']) != 1 else params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'] + return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy'] + + +def create_table(dburl, tablename, schema_info, fmt, distrbution_policy): + try: + schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info]) + fmt = 'ROW' if fmt == 'AO' else fmt + query = 'create table %s(%s) with (appendonly=true, orientation=%s) %s;' % (tablename, schema, fmt, distrbution_policy) + conn = dbconn.connect(dburl, False) + rows = dbconn.execSQL(conn, query) + conn.commit() + except DatabaseError, ex: + logger.error('Failed to execute query ""%s"' % query) sys.exit(1) -def get_seg_name(options, databasename, tablename): +def get_seg_name(dburl, tablename, database, fmt): try: - relfilenode = 0 - relname = "" - query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 where pg_class1.relname ='%s' " - "and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename - dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + relname = '' + tablename = tablename.split('.')[-1] + query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 " + "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename conn = dbconn.connect(dburl, True) rows = dbconn.execSQL(conn, query) - conn.commit() - if rows.rowcount == 0: - logger.error("table '%s' not found in db '%s'" % (tablename, databasename)); + conn.commit() + if not rows.rowcount: + logger.error('table "%s" not found in db "%s"' % (tablename, database)) sys.exit(1) for row in rows: relname = row[0] conn.close() - except DatabaseError, ex: - logger.error("Failed to connect to database, this script can only be run when the database is up") - logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) - sys.exit(1) - - # check whether the target table is parquet format - if relname.find("paq") == -1: - logger.error("table '%s' is not parquet format" % tablename) + logger.error('Failed to run query "%s" with dbname "%s"' % (query, database)) sys.exit(1) + if fmt == 'Parquet': + if relname.find("paq") == -1: + logger.error("table '%s' is not parquet format" % tablename) + sys.exit(1) return relname -def check_hash_type(options, databasename, tablename): +def check_hash_type(dburl, tablename): + '''Check whether target table is hash-typed, in that case simple insertion does not work''' try: query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename - dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) conn = dbconn.connect(dburl, False) rows = dbconn.execSQL(conn, query) - conn.commit() - if rows.rowcount == 0: - logger.error("target not found in table gp_distribution_policy") + conn.commit() + if not rows.rowcount: + logger.error('Target not found in table gp_distribution_policy.') sys.exit(1) for row in rows: - if row[0] != None: - logger.error("Cannot register file(s) to a table which is hash-typed") + if row[0]: + logger.error('Cannot register file(s) to a table which is hash-typed.') sys.exit(1) - conn.close() - except DatabaseError, ex: - logger.error("Failed to connect to database, this script can only be run when the database is up") - logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + logger.error('Failed to execute query "%s"' % query) sys.exit(1) -def get_metadata_from_database(options, databasename, tablename, seg_name): +def get_metadata_from_database(dburl, tablename, seg_name): + '''Get the metadata to be inserted from hdfs''' try: - query = "select segno from pg_aoseg.%s;" % seg_name - dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + query = 'select segno from pg_aoseg.%s;' % seg_name conn = dbconn.connect(dburl, False) rows = dbconn.execSQL(conn, query) - conn.commit() + conn.commit() conn.close() - except DatabaseError, ex: - logger.error("Failed to connect to database, this script can only be run when the database is up") - logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + logger.error('Failed to execute query "%s"' % query) sys.exit(1) firstsegno = rows.rowcount + 1 - # get the full path of correspoding file for target table try: + # get the full path of correspoding file for target table query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, " - "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = " - "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid " - "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename - dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=databasename) + "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = " + "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid " + "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1] conn = dbconn.connect(dburl, False) rows = dbconn.execSQL(conn, query) - conn.commit() + conn.commit() conn.close() - except DatabaseError, ex: - logger.error("Failed to connect to database, this script can only be run when the database is up") - logger.error("host = %s, port = %d, user = %s, dbname = %s, query = %s" % (options.host, options.port, options.user, databasename, query)) + logger.error('Failed to execute query "%s"' % query) sys.exit(1) - for row in rows: tabledir = row[0].strip() + "/" + str(row[1]) + "/" + str(row[2]) + "/" + str(row[3]) + "/" - + #tabledir = '/'.join([row[0], str(row[1]), str(row[2]), str(row[3]), '']) --- End diff -- I will remove L157 and remain L158.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---