[ 
https://issues.apache.org/jira/browse/BEAM-5725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670108#comment-16670108
 ] 

Wout Scheepers edited comment on BEAM-5725 at 10/31/18 1:48 PM:
----------------------------------------------------------------

I've been digging into it and came up with the following so far:

As Tim already pointed out, the problem here is that parseResponse(response) is 
indeed not repeatable, the content can only be consumed once as the Elastic 
Response object encapsulates a HttpEntity object.
 I added a reproducible unit test[1], which tries to insert a valid document 
with the retryConfiguration set. 
 It becomes clear that handleRetry() does not have to be called for the bug to 
appear.

My first thought on a solution was to use the reset() method on the InputStream 
of the Entity content, however, this is not supported for the InputStream used 
in HttpEntity.

A possible solution would be the following:
 Encapsulate the Response object in a wrapper object, making sure the content 
of the HttpEntity object can be parsed repeatable.
 I think the best way to do this is by implementing a BufferedHttpEntity[2] in 
the wrapper.

This can be done either only in the case a retryConfiguration is set, but I 
guess it's probably better to create a wrapper for the response objects 
everywhere in the ElasticSearchIO class?

Is there a better or more elegant solution for this?

I've also found a way of getting control on how to read bytes from the buffer 
in elastic docs[3] but I'm not sure it can be of any help (it's for async 
calls, not sure they are used in ElasticSearchIO):
 "... As for reading the response body, the HttpEntity#getContent method comes 
handy which returns an InputStream reading from the previously buffered 
response body. As an alternative, it is possible to provide a custom 
org.apache.http.nio.protocol.HttpAsyncResponseConsumer that controls how bytes 
are read and buffered. ..."

I'm happy to get some thoughts on a good solution.

Thanks,
 Wout

[1] 
[https://github.com/wscheep/beam/commit/8f2093066a2908f0472983cfc640bc7644b728d9]
 [2] 
[https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/entity/BufferedHttpEntity.html]
 [3] 
[https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-low-usage-responses.html]


was (Author: wouts):
I've been digging into it and came up with the following so far:

As Tim already pointed out, the problem here is that parseResponse(response) is 
indeed not repeatable, the content can only be consumed once as the Elastic 
Response object encapsulates a HttpEntity object.
 I added a reproducible unit test[1], which tries to insert a valid document 
with the retryConfiguration set. 
 It becomes clear that handleRetry() does not have to be called for the bug to 
appear.

My first thought on a solution was to use the reset() method on the InputStream 
of the Entity content, however, this is not supported for the InputStream used 
in HttpEntity.

A possible solution would be the following:
 Encapulate the Response object in a wrapper object, making sure the content of 
the HttpEntity object can be parsed repeatable.
 I think the best way to do this is by implementing a BufferedHttpEntity[2] in 
the wrapper.

This can be done either only in the case a retryConfiguration is set, but I 
guess it's probably better to create a wrapper for the response objects 
everywhere in the ElasticSearchIO class?

Is there a better or more elegant solution for this?

I've also found a way of getting control on how to read bytes from the buffer 
in elastic docs[3] but I'm not sure it can be of any help (it's for async 
calls, not sure they are used in ElasticSearchIO):
 "... As for reading the response body, the HttpEntity#getContent method comes 
handy which returns an InputStream reading from the previously buffered 
response body. As an alternative, it is possible to provide a custom 
org.apache.http.nio.protocol.HttpAsyncResponseConsumer that controls how bytes 
are read and buffered. ..."

I'm happy to get some thoughts on a good solution.

Thanks,
 Wout

[1] 
[https://github.com/wscheep/beam/commit/8f2093066a2908f0472983cfc640bc7644b728d9]
 [2] 
[https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/entity/BufferedHttpEntity.html]
 [3] 
[https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-low-usage-responses.html]

> ElasticsearchIO RetryConfiguration response parse failure
> ---------------------------------------------------------
>
>                 Key: BEAM-5725
>                 URL: https://issues.apache.org/jira/browse/BEAM-5725
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-elasticsearch
>            Reporter: Wout Scheepers
>            Assignee: Wout Scheepers
>            Priority: Major
>
> When using .withRetryConfiguration() for ElasticsearchIO, I get the following 
> stacktrace:
>  
>  
> {code:java}
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No 
> content to map due to end-of-input
> at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1, column: 
> 0]
> at 
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> at 
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> at 
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:173)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:177)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1204)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1175)
> {code}
>  
>  
> Probably the elastic response object's content stream is consumed twice, 
> resulting in a MismatchedInputException.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to