[ https://issues.apache.org/jira/browse/BEAM-5725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670108#comment-16670108 ]
Wout Scheepers edited comment on BEAM-5725 at 10/31/18 1:48 PM: ---------------------------------------------------------------- I've been digging into it and came up with the following so far: As Tim already pointed out, the problem here is that parseResponse(response) is indeed not repeatable, the content can only be consumed once as the Elastic Response object encapsulates a HttpEntity object. I added a reproducible unit test[1], which tries to insert a valid document with the retryConfiguration set. It becomes clear that handleRetry() does not have to be called for the bug to appear. My first thought on a solution was to use the reset() method on the InputStream of the Entity content, however, this is not supported for the InputStream used in HttpEntity. A possible solution would be the following: Encapsulate the Response object in a wrapper object, making sure the content of the HttpEntity object can be parsed repeatable. I think the best way to do this is by implementing a BufferedHttpEntity[2] in the wrapper. This can be done either only in the case a retryConfiguration is set, but I guess it's probably better to create a wrapper for the response objects everywhere in the ElasticSearchIO class? Is there a better or more elegant solution for this? I've also found a way of getting control on how to read bytes from the buffer in elastic docs[3] but I'm not sure it can be of any help (it's for async calls, not sure they are used in ElasticSearchIO): "... As for reading the response body, the HttpEntity#getContent method comes handy which returns an InputStream reading from the previously buffered response body. As an alternative, it is possible to provide a custom org.apache.http.nio.protocol.HttpAsyncResponseConsumer that controls how bytes are read and buffered. ..." I'm happy to get some thoughts on a good solution. Thanks, Wout [1] [https://github.com/wscheep/beam/commit/8f2093066a2908f0472983cfc640bc7644b728d9] [2] [https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/entity/BufferedHttpEntity.html] [3] [https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-low-usage-responses.html] was (Author: wouts): I've been digging into it and came up with the following so far: As Tim already pointed out, the problem here is that parseResponse(response) is indeed not repeatable, the content can only be consumed once as the Elastic Response object encapsulates a HttpEntity object. I added a reproducible unit test[1], which tries to insert a valid document with the retryConfiguration set. It becomes clear that handleRetry() does not have to be called for the bug to appear. My first thought on a solution was to use the reset() method on the InputStream of the Entity content, however, this is not supported for the InputStream used in HttpEntity. A possible solution would be the following: Encapulate the Response object in a wrapper object, making sure the content of the HttpEntity object can be parsed repeatable. I think the best way to do this is by implementing a BufferedHttpEntity[2] in the wrapper. This can be done either only in the case a retryConfiguration is set, but I guess it's probably better to create a wrapper for the response objects everywhere in the ElasticSearchIO class? Is there a better or more elegant solution for this? I've also found a way of getting control on how to read bytes from the buffer in elastic docs[3] but I'm not sure it can be of any help (it's for async calls, not sure they are used in ElasticSearchIO): "... As for reading the response body, the HttpEntity#getContent method comes handy which returns an InputStream reading from the previously buffered response body. As an alternative, it is possible to provide a custom org.apache.http.nio.protocol.HttpAsyncResponseConsumer that controls how bytes are read and buffered. ..." I'm happy to get some thoughts on a good solution. Thanks, Wout [1] [https://github.com/wscheep/beam/commit/8f2093066a2908f0472983cfc640bc7644b728d9] [2] [https://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/org/apache/http/entity/BufferedHttpEntity.html] [3] [https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-low-usage-responses.html] > ElasticsearchIO RetryConfiguration response parse failure > --------------------------------------------------------- > > Key: BEAM-5725 > URL: https://issues.apache.org/jira/browse/BEAM-5725 > Project: Beam > Issue Type: Bug > Components: io-java-elasticsearch > Reporter: Wout Scheepers > Assignee: Wout Scheepers > Priority: Major > > When using .withRetryConfiguration() for ElasticsearchIO, I get the following > stacktrace: > > > {code:java} > Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No > content to map due to end-of-input > at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1, column: > 0] > at > com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) > at > com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133) > at > com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988) > at > com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058) > at > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:173) > at > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:177) > at > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1204) > at > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1175) > {code} > > > Probably the elastic response object's content stream is consumed twice, > resulting in a MismatchedInputException. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)