HI Team,


We are facing the below mentioned issue while deploying the Nifi flow from one 
Nifi Registry bucket to another bucket using Nipy API, if flow contains any 
parameter context. However, it is working fine if flow doesn't contain any 
parameter context.



Issue is while calling nipyapi.versioning.deploy_flow_version API with 
following error message



    raise ApiException(http_resp=r)

nipyapi.nifi.rest.ApiException: (500)

Reason: Internal Server Error

HTTP response headers: HTTPHeaderDict({'Date': 'Tue, 20 Oct 2020 07:55:30 GMT', 
'X-Frame-Options': 'SAMEORIGIN', 'Content-Security-Policy': "frame-ancestors 
'self'", 'X-XSS-Protection': '1; mode=block', 'Strict-Transport-Security': 
'max-age=31540000', 'Content-Type': 'text/plain', 'Vary': 'Accept-Encoding', 
'Content-Length': '79', 'Server': 'Jetty(9.4.19.v20190610)'})

HTTP response body: An unexpected error has occurred. Please check the logs for 
additional details.



There is no error/exception in any other additional logs.



It seems that there is some issue in exporting and importing the flow from 
source bucket to destination bucket using Nipy API. As after importing the flow 
in destination bucket, it is also giving error while importing it at 
destination Nifi instance using Nifi UI from destination bucket.

And it is working fine while importing the flow through destination Nifi UI 
from source bucket. But we are not getting any error in 
nipyapi.versioning.export_flow_version and 
nipyapi.versioning.import_flow_version API calls



Could you please look into it and suggest if it is not possible through Nipy 
API if flow contains any parameter context. Or is there any other way to do it.

I have attached the source code for the reference.



We are using following  versions:



Docker Nifi - 1.11.4

Docker Nifi Registry - nifi-registry-0.7.0

Nipy API - 0.14.3





Regards,

Deepak Gupta


________________________________

The information contained in this message is intended only for the recipient, 
and may be a confidential attorney-client communication or may otherwise be 
privileged and confidential and protected from disclosure. If the reader of 
this message is not the intended recipient, or an employee or agent responsible 
for delivering this message to the intended recipient, please be aware that any 
dissemination or copying of this communication is strictly prohibited. If you 
have received this communication in error, please immediately notify us by 
replying to the message and deleting it from your computer. S&P Global Inc. 
reserves the right, subject to applicable local law, to monitor, review and 
process the content of any electronic message or information sent to or from 
S&P Global Inc. e-mail addresses without informing the sender or recipient of 
the message. By sending electronic message or information to S&P Global Inc. 
e-mail addresses you, as the sender, are consenting to S&P Global Inc. 
processing any of your personal data therein.
import os
import sys
import nipyapi.nifi.apis
import requests
import argparse
import urllib3
import json
import nipyapi


# Parser setup to get the argument list a little more reasonable...
# Make the parser sans help and reorder the argument groups
parser = argparse.ArgumentParser(add_help=False)
required = parser.add_argument_group('required arguments')
optional = parser.add_argument_group('optional arguments')

# Add optional arguments here, starting with help!
optional.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, 
help='show this help message and exit')

required.add_argument('-p', '--password', required=True, help='Password of the 
Service Account') 
args = parser.parse_args()

username='testing'
password=args.password
registry_client='devRegistry'
dest_nifi_hostname="testing-b-nifi.dev.demo.com"
dest_nifi_port="9443"
source_registry_hostname="testing-registry-v2.dev.demo.com"
source_registry_port='18443'
dest_registry_hostname='testing-registry-v2.dev.demo.com'
dest_registry_port='18443'
source_bucket_name='TestingBucket1'
dest_bucket_name='TestingBucket2'

source_flow_name='testflow'

dest_flow_name='testflow1_Migrated'

versioned_pg_name='testflow1'
version_to_deploy='1'


dest_nifi_url = "https://{0}:{1}".format(dest_nifi_hostname, dest_nifi_port)
source_registry_url = 
"https://{0}:{1}".format(source_registry_hostname,source_registry_port)
destination_registry_url = 
"https://{0}:{1}".format(dest_registry_hostname,dest_registry_port)

# Initial configuration
nipyapi.config.registry_config.verify_ssl = False
nipyapi.nifi.configuration.verify_ssl = False
nipyapi.config.registry_config.host = source_registry_url + '/nifi-registry-api'


nipyapi.config.nifi_config.host = dest_nifi_url + '/nifi-api'
print(nipyapi.config.registry_config.host)
print(nipyapi.config.nifi_config.host)


print('----------------------------------STEP-1 : 
START-----------------------------------------------------------------')
# Step 1:  Exporting Updated Dev Flow Version"

print("Step 1: Exporting Updated Dev Flow Version")

if username!=None and password!=None:
   nipyapi.security.service_login(service='registry', username=username, 
password=password)
dev_bucket = nipyapi.versioning.get_registry_bucket(source_bucket_name)
dev_ver_flow = 
nipyapi.versioning.get_flow_in_bucket(dev_bucket.identifier,identifier=source_flow_name)
dev_export = 
nipyapi.versioning.export_flow_version(bucket_id=dev_bucket.identifier,flow_id=dev_ver_flow.identifier,
 version=version_to_deploy, mode='json')


print("Step 1 Completed: Updated Dev Flow Version Exported Successfully--> Move 
to Step 2")

print('--------------------------------STEP-2 : 
START-------------------------------------------------------------------')
# Step 2:  Connecting to destination registry and instance 

print("Step 2: Connecting to destination registry and instance ")
nipyapi.config.registry_config.host = destination_registry_url + 
'/nifi-registry-api'
nipyapi.utils.set_endpoint(nipyapi.config.nifi_config.host)
nipyapi.utils.set_endpoint(nipyapi.config.registry_config.host)
nipyapi.security.service_login(service='registry', username=username, 
password=password)
nipyapi.security.service_login(service='nifi', username=username, 
password=password)
print("Step 2 Completed: Connected to Destination Instance and Registry--> Move 
to Step 3")


print('-----------------------------------STEP-3 : 
START----------------------------------------------------------------')


# Step 3 : Pushing updated version into Destination Registry Flow

print("Step 3: Pushing updated version into Destination Registry")
prod_bucket = nipyapi.versioning.get_registry_bucket(dest_bucket_name)
prod_flow = 
nipyapi.versioning.get_flow_in_bucket(bucket_id=prod_bucket.identifier,identifier=dest_flow_name)

print(prod_flow)

if prod_flow == None:
  
  reg_client = nipyapi.versioning.get_registry_client(registry_client)
  source_bucket = nipyapi.versioning.get_registry_bucket(source_bucket_name)
  source_flow = 
nipyapi.versioning.get_flow_in_bucket(bucket_id=source_bucket.identifier,identifier=source_flow_name)
  
nipyapi.versioning.deploy_flow_version(parent_id=nipyapi.canvas.get_root_pg_id(),location=(0,
 
0),bucket_id=source_bucket.identifier,flow_id=source_flow.identifier,reg_client_id=reg_client.id,version=None)
  
  versioned_pgid = nipyapi.canvas.get_process_group(versioned_pg_name, 
identifier_type='name', greedy=True)
  nipyapi.versioning.stop_flow_ver(versioned_pgid, refresh=True)
  nipyapi.versioning.save_flow_ver(versioned_pgid, registry_client=reg_client, 
bucket=prod_bucket, flow_name=dest_flow_name)

  
  print("Step 4 updating the canvas")
else:
  nipyapi.versioning.import_flow_version(bucket_id=prod_bucket.identifier, 
encoded_flow=dev_export, flow_name=None, flow_id=prod_flow.identifier)
  versioned_pgid = nipyapi.canvas.get_process_group(versioned_pg_name, 
identifier_type='name', greedy=True)
  
  
  print("--------------------versioned_pgid---------------------------")
  print(versioned_pgid)
  
  print("Step 3 Completed: Pushed updated version into Destination Registry ")
  nipyapi.versioning.update_flow_ver(versioned_pgid)
  
 
  print("Step 4 Flow already existed..Updating the requested version in canvas")



print('-----------------------------------Deployment Completed..Exiting the 
script....----------------------------------------------------------------')
exit(0)

Reply via email to