It stays in the queue on a field type error, e.g.:
[2016-11-07 12:37:17,263][DEBUG][action.bulk              ] [Cybele] 
[estreamer_index_2016.11.07][0] failed to execute bulk item (index) index 
{[estreamer_index_2016.11.07][estreamer_doc][AVg_3ZmnIxO_esWdRpuM], source
...data...
MapperParsingException[failed to parse [applicationId]]; nested: 
NumberFormatException[For input string: "4294967295"];
        at 
org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:329)
        at 
org.elasticsearch.index.mapper.DocumentParser.parseObjectOrField(DocumentParser.java:311)
        at 
org.elasticsearch.index.mapper.DocumentParser.parseValue(DocumentParser.java:438)
        at 
org.elasticsearch.index.mapper.DocumentParser.parseObject(DocumentParser.java:264)
        at 
org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:124)
        at 
org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:309)
        at 
org.elasticsearch.index.shard.IndexShard.prepareCreate(IndexShard.java:529)
        at 
org.elasticsearch.index.shard.IndexShard.prepareCreateOnPrimary(IndexShard.java:506)
        at 
org.elasticsearch.action.index.TransportIndexAction.prepareIndexOperationOnPrimary(TransportIndexAction.java:214)
        at 
org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(TransportIndexAction.java:223)
        at 
org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:327)
        at 
org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:120)
        at 
org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:68)
        at 
org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.doRun(TransportReplicationAction.java:648)
        at 
org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
        at 
org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:279)
        at 
org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:271)
        at 
org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:77)
        at 
org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376)
        at 
org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NumberFormatException: For input string: "4294967295"
        at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Integer.parseInt(Integer.java:583)
        at java.lang.Integer.parseInt(Integer.java:615)
        at 
org.elasticsearch.common.xcontent.support.AbstractXContentParser.intValue(AbstractXContentParser.java:127)
        at 
org.elasticsearch.index.mapper.core.IntegerFieldMapper.innerParseCreateField(IntegerFieldMapper.java:293)
        at 
org.elasticsearch.index.mapper.core.NumberFieldMapper.parseCreateField(NumberFieldMapper.java:241)
        at 
org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:321)
        ... 22 more

-----Original Message-----
From: Koji Kawamura [mailto:[email protected]] 
Sent: Monday, November 07, 2016 12:10 AM
To: [email protected]
Subject: Re: PutElasticsearchHttp error behaviour

Hello Gaspar,

I looked at the PutElasticsearchHttp code, it distinguishes following two types 
of error:

1. Error occurs when the processor tries to connect Elasticsearch, but it 
couldn't. E.g. the specified Elasticsearch URL is not correct and 
java.net.ConnectException is thrown.
When this happens, the processor throws ProcessException, and NiFi framework 
keeps the FlowFile in the queue, so that it can be retried later.
This indicates that DataFlowManager(user) has to update the processor 
configuration to make it work properly.

2. The processor successfully sends a request to Elasticsearch, but 
Elasticsearch returns error response. This happens with operations such as 
updating with a document Id that doesn't exist in the target index.
When this happens, the processor routes the incoming FlowFile which caused the 
error, to failure relationship.

I think the error you encountered the former type of error above (#1).
If the error was returned from Elasticsearch, it should be routed to failure 
relationship as you suspected. Would you please let us know about the specific 
error you got? A StackTrace would be helpful.

BTW, PutElasticsearchHttp also has REL_RETRY relationship, but it seems it's 
not used currently.

Thanks!
Koji

On Thu, Nov 3, 2016 at 5:47 AM, Gaspar, Carson <[email protected]> wrote:
> When PutElasticsearchHttp gets an error from Elasticsearch, it 
> _appears_ to never dequeue the problem flow file and just try it over 
> and over again. It certainly doesn’t route the problem flow file to 
> the retry or failure queues. This seems wrong to me – am I missing something?
>
> Nifi is from the Hortonworks 
> nifi_2_0_0_0_579-1.0.0.2.0.0.0-579.el6.x86_64
> RPM.
>
>

Reply via email to