Hey Tim, Romain,

I created the ticket (BEAM-5725. I’ll try to fix it, as it’s time I made my 
first PR.
First will focus on getting a reproducible in a unit test.

Thanks!
Wout



From: Tim Robertson <timrobertson...@gmail.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Thursday, 11 October 2018 at 20:25
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: ElasticIO retry configuration exception

I took a super quick look at the code and I think Romain is correct.

1. On a retry scenario it calls handleRetry()
2. Within handleRetry() it gets the DefaultRetryPredicate and calls 
test(response) - this reads the response stream to JSON
3. When the retry is successful (no 429 code) the response is returned
4. The response is then passed in to checkForErrors(...)
5. This then tried to parse the response by reading the response stream. It was 
already read in step 2

Can you please open a Jira for this Wout? 
https://issues.apache.org/jira/projects/BEAM/issues
If you don't have an account I'll create it.

This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so 
before released in 2.9.0.
However as soon as it is fixed it is fairly easy to bring into your own 
project, by copying in the single ElasticsearchIO.java declared in the same 
package.

Thank you for reporting the issue,
Tim




On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau 
<rmannibu...@gmail.com<mailto:rmannibu...@gmail.com>> wrote:
It looks more like a client issue where the stream is already read, maybe give 
a try to reproduce it in a unit test in beam ES module? This will enable us to 
help you more accurately.

Romain Manni-Bucau
@rmannibucau<https://twitter.com/rmannibucau> |  
Blog<https://rmannibucau.metawerx.net/> | Old 
Blog<http://rmannibucau.wordpress.com> | Github<https://github.com/rmannibucau> 
| LinkedIn<https://www.linkedin.com/in/rmannibucau> | 
Book<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers 
<wout.scheep...@vente-exclusive.com<mailto:wout.scheep...@vente-exclusive.com>> 
a écrit :
Hey Romain,

I’ve check and am using the same http client as beam 2.7.0.

Just to be sure, I’ve created a minimal reproducible with a fresh project with 
only the following dependencies in my build.gradle:
dependencies {
    compile ('org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0')
    compile ('org.apache.beam:beam-runners-direct-java:2.7.0')
    compile ('org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0')
    compile ('org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0')
    compile 
('org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0')
    compile ('org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0')
    compile ('org.apache.beam:beam-sdks-java-io-common:2.7.0')
    compile ('org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0')
    compile ('org.apache.beam:beam-sdks-java-io-jdbc:2.7.0')


    testCompile 'org.hamcrest:hamcrest-all:1.3'
    testCompile 'org.assertj:assertj-core:3.4.1'
    testCompile 'junit:junit:4.12'
}


However, the problem still persists when writing a document to elastic with the 
retryConfiguration set.
I guess the problem lies at my elastic version, as JB implies?

Anyway, thanks for the suggestion.

Wout

From: Romain Manni-Bucau <rmannibu...@gmail.com<mailto:rmannibu...@gmail.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Wednesday, 10 October 2018 at 16:53
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: ElasticIO retry configuration exception

Hi Wout,

Maye check your classpath http client versions (against 
https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
 for instance).

Romain Manni-Bucau
@rmannibucau<https://twitter.com/rmannibucau> |  
Blog<https://rmannibucau.metawerx.net/> | Old 
Blog<http://rmannibucau.wordpress.com> | Github<https://github.com/rmannibucau> 
| LinkedIn<https://www.linkedin.com/in/rmannibucau> | 
Book<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le mer. 10 oct. 2018 à 15:37, Wout Scheepers 
<wout.scheep...@vente-exclusive.com<mailto:wout.scheep...@vente-exclusive.com>> 
a écrit :
Hey JB,

Thanks for your fast reply.
The elastic version we're using is 5.6.2.

"version": {
        "number": "5.6.2",
        "build_hash": "57e20f3",
        "build_date": "2017-09-23T13:16:45.703Z",
        "build_snapshot": false,
        "lucene_version": "6.6.1"
    }


Wout



On 10/10/2018, 15:34, "Jean-Baptiste Onofré" 
<j...@nanthrax.net<mailto:j...@nanthrax.net>> wrote:

    Hi Wout,

    what's the elasticsearch version ? (just to try to reproduce)

    Thanks,
    Regards
    JB

    On 10/10/2018 15:31, Wout Scheepers wrote:
    > Hey all,
    >
    >
    >
    > When using .withRetryConfiguration()for ElasticsearchIO, I get the
    > following stacktrace:
    >
    >
    >
    > Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
    > No content to map due to end-of-input
    >
    > at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
    > column: 0]
    >
    >        at
    > 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
    >
    >        at
    > 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
    >
    >        at
    > 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
    >
    >        at
    > 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
    >
    >        at
    > 
org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
    >
    >        at
    > 
org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
    >
    >        at
    > 
org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
    >
    >        at
    > 
org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)
    >
    >
    >
    > I’ve been breaking my head on this one.
    >
    > Apparently the elastic Response object can’t be parsed anymore in the
    > checkForErrors() method.
    >
    > However, it is parsed successfully in the default RetryPredicate’s test
    > method, which is called in flushBatch() in the if clause related to the
    > retryConfig (ElasticsearchIO:1201).
    >
    > As far as I know, the Response object is not altered.
    >
    >
    >
    > Any clues why this doesn’t work for me?
    >
    > I really need this feature, as inserting 40M documents into elastic
    > results in too many retry timeouts ☺.
    >
    >
    >
    > Thanks!
    > Wout
    >
    >
    >
    >
    >

    --
    Jean-Baptiste Onofré
    jbono...@apache.org<mailto:jbono...@apache.org>
    http://blog.nanthrax.net
    Talend - http://www.talend.com

Reply via email to