Sanjeet,

As provided, this won't integrate well with the existing NiFi
processors. You would need to implement it as a controller service
object and update the processors to use it. Also, if you want to use
processors based on the official Elasticsearch client API, the ones
under the "REST API bundle" are the best fit because they already use
controller services that use the official Elastic clients.

Thanks,

Mike

On Wed, Sep 9, 2020 at 12:14 PM sanjeet rath <rath.sanj...@gmail.com> wrote:
>
> Hi ,
>
> We are using AWS managed ElasticSearch and our nifi is hosted in EC2.
> I have a use case of building a custom processor on top of 
> putElasticSearchHTTP, where it will use aws IAM based role 
> awscredentialprovider service to connect AWS ElasticSearch.
> This will be similar to PUTSQS where we are using IAM role based 
> awscredentialprovider service to connect SQS and its working fine.
>
> But there is no awscredentailprovider controller service is available in 
> putElasticSearchHTTP.
>
> So my plan is adding a awscredentailprovider controller service to 
> putElasticSearchHTTP , where i will use bellow code  to connect to 
> elasticsearch.
>
> Is my approach correct ? Could you provide any better thought on this ?
>
> public class AmazonElasticsearchServiceSample { private static String 
> serviceName = "es"; private static String region = "us-west-1"; private 
> static String aesEndpoint = "https://domain.us-west-1.es.amazonaws.com";; 
> private static String payload = "{ \"type\": \"s3\", \"settings\": { 
> \"bucket\": \"your-bucket\", \"region\": \"us-west-1\", \"role_arn\": 
> \"arn:aws:iam::123456789012:role/TheServiceRole\" } }"; private static String 
> snapshotPath = "/_snapshot/my-snapshot-repo"; private static String 
> sampleDocument = "{" + "\"title\":\"Walk the Line\"," + "\"director\":\"James 
> Mangold\"," + "\"year\":\"2005\"}"; private static String indexingPath = 
> "/my-index/_doc"; static final AWSCredentialsProvider credentialsProvider = 
> new DefaultAWSCredentialsProviderChain(); public static void main(String[] 
> args) throws IOException { RestClient esClient = esClient(serviceName, 
> region); // Register a snapshot repository HttpEntity entity = new 
> NStringEntity(payload, ContentType.APPLICATION_JSON); Request request = new 
> Request("PUT", snapshotPath); request.setEntity(entity); // 
> request.addParameter(name, value); // optional parameters Response response = 
> esClient.performRequest(request); System.out.println(response.toString()); // 
> Index a document entity = new NStringEntity(sampleDocument, 
> ContentType.APPLICATION_JSON); String id = "1"; request = new Request("PUT", 
> indexingPath + "/" + id); request.setEntity(entity); // Using a String 
> instead of an HttpEntity sets Content-Type to application/json automatically. 
> // request.setJsonEntity(sampleDocument); response = 
> esClient.performRequest(request); System.out.println(response.toString()); }
> public static RestClient esClient(String serviceName, String region) { 
> AWS4Signer signer = new AWS4Signer(); signer.setServiceName(serviceName); 
> signer.setRegionName(region); HttpRequestInterceptor interceptor = new 
> AWSRequestSigningApacheInterceptor(serviceName, signer, credentialsProvider); 
> return 
> RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(hacb
>  -> hacb.addInterceptorLast(interceptor)).build(); }
> https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-request-signing.html
>
>
>
> Regards,
> Sanjeet
>
> --
> Sanjeet Kumar Rath,
> mob- +91 8777577470
>

Reply via email to