Re: ElasticIO retry configuration exception

2018-10-11 Thread Tim
Great! Thank you.

Feel free to add me as reviewer if you open a PR.

Tim

> On 12 Oct 2018, at 08:28, Wout Scheepers  
> wrote:
> 
> Hey Tim, Romain,
>  
> I created the ticket (BEAM-5725. I’ll try to fix it, as it’s time I made my 
> first PR.
> First will focus on getting a reproducible in a unit test.
> 
> Thanks!
> Wout
> 
> 
>  
> From: Tim Robertson 
> Reply-To: "user@beam.apache.org" 
> Date: Thursday, 11 October 2018 at 20:25
> To: "user@beam.apache.org" 
> Subject: Re: ElasticIO retry configuration exception
>  
> I took a super quick look at the code and I think Romain is correct.
>  
> 1. On a retry scenario it calls handleRetry()
> 2. Within handleRetry() it gets the DefaultRetryPredicate and calls 
> test(response) - this reads the response stream to JSON
> 3. When the retry is successful (no 429 code) the response is returned
> 4. The response is then passed in to checkForErrors(...)
> 5. This then tried to parse the response by reading the response stream. It 
> was already read in step 2 
>  
> Can you please open a Jira for this Wout? 
> https://issues.apache.org/jira/projects/BEAM/issues
> If you don't have an account I'll create it.
>  
> This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so 
> before released in 2.9.0. 
> However as soon as it is fixed it is fairly easy to bring into your own 
> project, by copying in the single ElasticsearchIO.java declared in the same 
> package.
>  
> Thank you for reporting the issue,
> Tim
>  
>  
>  
>  
> On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau  
> wrote:
> It looks more like a client issue where the stream is already read, maybe 
> give a try to reproduce it in a unit test in beam ES module? This will enable 
> us to help you more accurately.
> 
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>  
>  
> Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers 
>  a écrit :
> Hey Romain,
>  
> I’ve check and am using the same http client as beam 2.7.0.
> Just to be sure, I’ve created a minimal reproducible with a fresh project 
> with only the following dependencies in my build.gradle:
> dependencies {
> compile ('org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0')
> compile ('org.apache.beam:beam-runners-direct-java:2.7.0')
> compile ('org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0')
> compile 
> ('org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-io-common:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-io-jdbc:2.7.0')
> 
> 
> testCompile 'org.hamcrest:hamcrest-all:1.3'
> testCompile 'org.assertj:assertj-core:3.4.1'
> testCompile 'junit:junit:4.12'
> }
>  
> However, the problem still persists when writing a document to elastic with 
> the retryConfiguration set.
> I guess the problem lies at my elastic version, as JB implies?
>  
> Anyway, thanks for the suggestion.
>  
> Wout
>  
> From: Romain Manni-Bucau 
> Reply-To: "user@beam.apache.org" 
> Date: Wednesday, 10 October 2018 at 16:53
> To: "user@beam.apache.org" 
> Subject: Re: ElasticIO retry configuration exception
>  
> Hi Wout,
>  
> Maye check your classpath http client versions (against 
> https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
>  for instance).
> 
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>  
>  
> Le mer. 10 oct. 2018 à 15:37, Wout Scheepers 
>  a écrit :
> Hey JB,
> 
> Thanks for your fast reply.
> The elastic version we're using is 5.6.2.
> 
> "version": {
> "number": "5.6.2",
> "build_hash": "57e20f3",
> "build_date": "2017-09-23T13:16:45.703Z",
> "build_snapshot": false,
> "lucene_version": "6.6.1"
> }
> 
> 
> Wout
> 
> 
> 
> On 10/10/2018, 15:34, "Jean-Baptiste Onofré"  wrote:
> 
> Hi Wout,
> 
> what's the elasticsearch version ? (just to try to reproduce)
> 
> Thanks,
> Regards
> JB
> 
> On 10/10/2018 15:31, Wout Scheepers wrote:
> > Hey all,
> > 
> >  
> > 
> > When using .withRetryConfiguration()for ElasticsearchIO, I get the
> > following stacktrace:
> > 
> >  
> > 
> > Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
> > No content to map due to end-of-input
> > 
> > at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> > column: 0]
> > 
> >at
> > 
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> > 
> >at
> > 
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> > 
> >at
> > 

Re: ElasticIO retry configuration exception

2018-10-11 Thread Wout Scheepers
Hey Tim, Romain,

I created the ticket (BEAM-5725. I’ll try to fix it, as it’s time I made my 
first PR.
First will focus on getting a reproducible in a unit test.

Thanks!
Wout



From: Tim Robertson 
Reply-To: "user@beam.apache.org" 
Date: Thursday, 11 October 2018 at 20:25
To: "user@beam.apache.org" 
Subject: Re: ElasticIO retry configuration exception

I took a super quick look at the code and I think Romain is correct.

1. On a retry scenario it calls handleRetry()
2. Within handleRetry() it gets the DefaultRetryPredicate and calls 
test(response) - this reads the response stream to JSON
3. When the retry is successful (no 429 code) the response is returned
4. The response is then passed in to checkForErrors(...)
5. This then tried to parse the response by reading the response stream. It was 
already read in step 2

Can you please open a Jira for this Wout? 
https://issues.apache.org/jira/projects/BEAM/issues
If you don't have an account I'll create it.

This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so 
before released in 2.9.0.
However as soon as it is fixed it is fairly easy to bring into your own 
project, by copying in the single ElasticsearchIO.java declared in the same 
package.

Thank you for reporting the issue,
Tim




On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau 
mailto:rmannibu...@gmail.com>> wrote:
It looks more like a client issue where the stream is already read, maybe give 
a try to reproduce it in a unit test in beam ES module? This will enable us to 
help you more accurately.

Romain Manni-Bucau
@rmannibucau |  
Blog | Old 
Blog | Github 
| LinkedIn | 
Book


Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
a écrit :
Hey Romain,

I’ve check and am using the same http client as beam 2.7.0.

Just to be sure, I’ve created a minimal reproducible with a fresh project with 
only the following dependencies in my build.gradle:
dependencies {
compile ('org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0')
compile ('org.apache.beam:beam-runners-direct-java:2.7.0')
compile ('org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0')
compile ('org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0')
compile 
('org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-common:2.7.0')
compile ('org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-jdbc:2.7.0')


testCompile 'org.hamcrest:hamcrest-all:1.3'
testCompile 'org.assertj:assertj-core:3.4.1'
testCompile 'junit:junit:4.12'
}


However, the problem still persists when writing a document to elastic with the 
retryConfiguration set.
I guess the problem lies at my elastic version, as JB implies?

Anyway, thanks for the suggestion.

Wout

From: Romain Manni-Bucau mailto:rmannibu...@gmail.com>>
Reply-To: "user@beam.apache.org" 
mailto:user@beam.apache.org>>
Date: Wednesday, 10 October 2018 at 16:53
To: "user@beam.apache.org" 
mailto:user@beam.apache.org>>
Subject: Re: ElasticIO retry configuration exception

Hi Wout,

Maye check your classpath http client versions (against 
https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
 for instance).

Romain Manni-Bucau
@rmannibucau |  
Blog | Old 
Blog | Github 
| LinkedIn | 
Book


Le mer. 10 oct. 2018 à 15:37, Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
a écrit :
Hey JB,

Thanks for your fast reply.
The elastic version we're using is 5.6.2.

"version": {
"number": "5.6.2",
"build_hash": "57e20f3",
"build_date": "2017-09-23T13:16:45.703Z",
"build_snapshot": false,
"lucene_version": "6.6.1"
}


Wout



On 10/10/2018, 15:34, "Jean-Baptiste Onofré" 
mailto:j...@nanthrax.net>> wrote:

Hi Wout,

what's the elasticsearch version ? (just to try to reproduce)

Thanks,
Regards
JB

On 10/10/2018 15:31, Wout Scheepers wrote:
> Hey all,
>
>
>
> When using .withRetryConfiguration()for ElasticsearchIO, I get the
> following stacktrace:
>
>
>
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
>
> at [Source: (org.apache.http.nio.

The return of the Beam vs. Flink SDK experiments!

2018-10-11 Thread Vergilio, Thalita
Hello Beamers,


You'll be pleased to know that I've made some progress in my Beam/Flink SDK 
comparison exercise (maybe now she'll go away...). However, it would be great 
those of you who are more familiar with Beam and Flink could cast your eyes 
over my new code snippets and let me know if you can spot any massive 
discrepancies.


Following the advice from some of you at the Beam Summit last week (special 
thanks to Reuven for taking the time to look into this issue with me), I have 
made the following changes:

  1.  ?Used a KeyedDeserializationSchema implementation in my Flink example, 
instead of the JSONDeserializationSchema? that eagerly deserialised the whole 
object before it was needed.
  2.  Simplified my data model so the records read from Kafka are non-keyed and 
deserialise from byte[ ] to primitives.
  3.  Removed all JSON serialisation and deserialisation from this example.

Both snippets now run much faster, and the performance is equivalent. This is 
much closer to what I was originally expecting, but the question remains: am I 
as close to a like-for-like comparison as I can get? Here are the code 
snippets, amended:

https://gist.github.com/tvergilio/fbb2e855e3d32de223d171d91fd1ec1e

https://gist.github.com/tvergilio/453638bd7cdd28a808ee103775b1fae5


And here is a recap of the experiment (if you care to know the details):

My aim is to write two pipelines which are equivalent, one using Beam, and the 
other using the Flink SDK. I will then run them in the same Flink cluster to 
measure the assumed cost of using an abstraction such as Beam to ensure 
portability of the code (as opposed to using the Flink SDK directly). I am 
using containerised Flink running on cloud-based nodes.


Procedure:

Two measurements are emitted to two different Kafka topics simultaneously: the 
total energy consumption of a server room, and the energy consumption of the IT 
equipment in that room. This data is emitted for 5 minutes at a time, at 
different velocities (every minute, every second, every millisecond, etc), and 
the performance of the task managers in terms of container CPU, memory and 
network usage is measured.


The processing pipeline calculates the Power Usage Effectiveness (PUE) of the 
room. This is simply the total energy consumed divided by the IT energy 
consumed. The PUE is calculated for a given window of time, so there must be 
some notion of per-window state (or else all the records of a window must have 
arrived before the entire calculation is executed). I have used the exact same 
calculation for the PUE for both cases, which can be found here:


https://gist.github.com/tvergilio/1c78ea337e6795de01f06cafdfa4cf84?


and here:


https://gist.github.com/tvergilio/2ed7d4541bc0de14325f82f8aa538d43


Now, theoretically, the PUE calculation could be improved by aggregating the 
real energy readings as they come, doing the same for the IT readings, emitting 
from both when the watermark passes the end of the window, then having a 
separate step calculate the PUE by dividing one aggregation by the other. My 
last question is: do you think this refinement is necessary, or is the "whole 
window at once" approach good enough? Would you expect the difference to be 
significant?


Thank you very much for taking the time to read this VERY long e-mail. Any 
suggestions/opinions/feedback are much appreciated.


All the best,


Thalita


To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


Spark storageLevel not taking effect

2018-10-11 Thread Mike Kaplinskiy
Hey folks,

Admittedly I may be a bit on the bleeding edge here, but I'm attempting to
run a Beam pipeline on Spark which is running on top of Kubernetes.
Specifically Beam 2.6.0 with Spark 2.4.0-rc2 running in client mode with a
Kubernetes (1.11) driver. It's actually pretty cool - from a Kubernetes
perspective, I start a pod which starts a ton of workers to do the parallel
stuff and then cleans up after itself.

One thing I can't seem to get working is setting the storage level for
Spark RDDs via Beam. Specifically passing --storageLevel=MEMORY_AND_DISK
seems to not work - the rdd still shows up as "Memory Deserialized 1x
Replicated" in the Spark UI. I would expect it to be something closer to
"Disk Memory Deserialized 1x Replicated." It *seems* to be serialized only
in the sense that less memory is used (I assume it gets encoded).

I even tried hardcoding storageLevel in BoundedDataset.java (based on the
line number in the DAG viz). Unfortunately it still shows up as memory-only.

Am I missing something that would let me spill data to disk?

For reference, here's my exact command line:
/opt/spark/bin/spark-submit
--master 'k8s://https://kubernetes:443'
--deploy-mode client
--name $(MY_POD_NAME)
--conf spark.executor.instances=20
--conf spark.driver.host=$(MY_POD_IP)
--conf spark.driver.port=7077
--conf spark.kubernetes.container.image=$(MY_IMAGE)
--conf spark.kubernetes.driver.pod.name=$(MY_POD_NAME)
--conf spark.kubernetes.executor.podNamePrefix=$(MY_POD_NAME)
--conf spark.executor.memory=5500m
--conf spark.executor.memoryOverhead=1300m
--conf spark.memory.fraction=0.45
--conf spark.executor.cores=3
--conf spark.kubernetes.executor.limit.cores=3
--conf spark.default.parallelism=60
--conf spark.kubernetes.allocation.batch.size=20
--conf spark.kubernetes.driver.label.app=beam-datomic-smoketest
--conf spark.kubernetes.node.selector.node.ladderlife.com/group=etl
--conf spark.kubernetes.executor.annotation.iam.amazonaws.com/role=etl-role
--conf spark.kubernetes.executor.secrets.google-cloud=/google-cloud-secrets
--conf spark.kubernetes.executor.secretKeyRef.SENTRY_DSN=sentry-secrets:dsn
--conf spark.executorEnv.STATSD_HOST=169.254.168.253
--class ladder.my_beam_job
local:///srv/beam_job.jar
--runner=SparkRunner
--storageLevel=MEMORY_AND_DISK

Thanks,
Mike.

Ladder . The smart, modern way to insure your life.


Re: ElasticIO retry configuration exception

2018-10-11 Thread Tim Robertson
I took a super quick look at the code and I think Romain is correct.

1. On a retry scenario it calls handleRetry()
2. Within handleRetry() it gets the DefaultRetryPredicate and calls
test(response) - this reads the response stream to JSON
3. When the retry is successful (no 429 code) the response is returned
4. The response is then passed in to checkForErrors(...)
5. This then tried to parse the response by reading the response stream. It
was already read in step 2

Can you please open a Jira for this Wout?
https://issues.apache.org/jira/projects/BEAM/issues
If you don't have an account I'll create it.

This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so
before released in 2.9.0.
However as soon as it is fixed it is fairly easy to bring into your own
project, by copying in the single ElasticsearchIO.java declared in the same
package.

Thank you for reporting the issue,
Tim




On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau 
wrote:

> It looks more like a client issue where the stream is already read, maybe
> give a try to reproduce it in a unit test in beam ES module? This will
> enable us to help you more accurately.
>
> Romain Manni-Bucau
> @rmannibucau  |  Blog
>  | Old Blog
>  | Github
>  | LinkedIn
>  | Book
> 
>
>
> Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers <
> wout.scheep...@vente-exclusive.com> a écrit :
>
>> Hey Romain,
>>
>>
>>
>> I’ve check and am using the same http client as beam 2.7.0.
>>
>> Just to be sure, I’ve created a minimal reproducible with a fresh project 
>> with only the following dependencies in my build.gradle:
>> dependencies {
>> compile (*'org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0'*)
>> compile (*'org.apache.beam:beam-runners-direct-java:2.7.0'*)
>> compile 
>> (*'org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0'*)
>> compile (*'org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0'*)
>> compile 
>> (*'org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0'*)
>> compile 
>> (*'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0'*)
>> compile (*'org.apache.beam:beam-sdks-java-io-common:2.7.0'*)
>> compile 
>> (*'org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0'*)
>> compile (*'org.apache.beam:beam-sdks-java-io-jdbc:2.7.0'*)
>>
>>
>> testCompile
>> *'org.hamcrest:hamcrest-all:1.3'*testCompile
>> *'org.assertj:assertj-core:3.4.1'*testCompile
>> *'junit:junit:4.12'*}
>>
>>
>>
>> However, the problem still persists when writing a document to elastic
>> with the retryConfiguration set.
>>
>> I guess the problem lies at my elastic version, as JB implies?
>>
>>
>>
>> Anyway, thanks for the suggestion.
>>
>>
>>
>> Wout
>>
>>
>>
>> *From: *Romain Manni-Bucau 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Wednesday, 10 October 2018 at 16:53
>> *To: *"user@beam.apache.org" 
>> *Subject: *Re: ElasticIO retry configuration exception
>>
>>
>>
>> Hi Wout,
>>
>>
>>
>> Maye check your classpath http client versions (against
>> https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
>> for instance).
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau  |  Blog
>>  | Old Blog
>>  | Github
>>  | LinkedIn
>>  | Book
>> 
>>
>>
>>
>>
>>
>> Le mer. 10 oct. 2018 à 15:37, Wout Scheepers <
>> wout.scheep...@vente-exclusive.com> a écrit :
>>
>> Hey JB,
>>
>> Thanks for your fast reply.
>> The elastic version we're using is 5.6.2.
>>
>> "version": {
>> "number": "5.6.2",
>> "build_hash": "57e20f3",
>> "build_date": "2017-09-23T13:16:45.703Z",
>> "build_snapshot": false,
>> "lucene_version": "6.6.1"
>> }
>>
>>
>> Wout
>>
>>
>>
>> On 10/10/2018, 15:34, "Jean-Baptiste Onofré"  wrote:
>>
>> Hi Wout,
>>
>> what's the elasticsearch version ? (just to try to reproduce)
>>
>> Thanks,
>> Regards
>> JB
>>
>> On 10/10/2018 15:31, Wout Scheepers wrote:
>> > Hey all,
>> >
>> >
>> >
>> > When using .withRetryConfiguration()for ElasticsearchIO, I get the
>> > following stacktrace:
>> >
>> >
>> >
>> > Caused by:
>> com.fasterxml.jackson.databind.exc.MismatchedInputException:
>> > No content to map due to end-of-input
>> >
>> > at [Source: (org.apache.http.nio.entity.ContentInputStream); line:
>> 1,
>> > column: 0]
>> >
>> >at
>> >
>> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputExcept

Re: ElasticIO retry configuration exception

2018-10-11 Thread Romain Manni-Bucau
It looks more like a client issue where the stream is already read, maybe
give a try to reproduce it in a unit test in beam ES module? This will
enable us to help you more accurately.

Romain Manni-Bucau
@rmannibucau  |  Blog
 | Old Blog
 | Github  |
LinkedIn  | Book



Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers <
wout.scheep...@vente-exclusive.com> a écrit :

> Hey Romain,
>
>
>
> I’ve check and am using the same http client as beam 2.7.0.
>
> Just to be sure, I’ve created a minimal reproducible with a fresh project 
> with only the following dependencies in my build.gradle:
> dependencies {
> compile (*'org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0'*)
> compile (*'org.apache.beam:beam-runners-direct-java:2.7.0'*)
> compile 
> (*'org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0'*)
> compile (*'org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0'*)
> compile 
> (*'org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0'*)
> compile 
> (*'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0'*)
> compile (*'org.apache.beam:beam-sdks-java-io-common:2.7.0'*)
> compile (*'org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0'*)
> compile (*'org.apache.beam:beam-sdks-java-io-jdbc:2.7.0'*)
>
>
> testCompile
> *'org.hamcrest:hamcrest-all:1.3'*testCompile
> *'org.assertj:assertj-core:3.4.1'*testCompile
> *'junit:junit:4.12'*}
>
>
>
> However, the problem still persists when writing a document to elastic
> with the retryConfiguration set.
>
> I guess the problem lies at my elastic version, as JB implies?
>
>
>
> Anyway, thanks for the suggestion.
>
>
>
> Wout
>
>
>
> *From: *Romain Manni-Bucau 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, 10 October 2018 at 16:53
> *To: *"user@beam.apache.org" 
> *Subject: *Re: ElasticIO retry configuration exception
>
>
>
> Hi Wout,
>
>
>
> Maye check your classpath http client versions (against
> https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
> for instance).
>
>
> Romain Manni-Bucau
> @rmannibucau  |  Blog
>  | Old Blog
>  | Github
>  | LinkedIn
>  | Book
> 
>
>
>
>
>
> Le mer. 10 oct. 2018 à 15:37, Wout Scheepers <
> wout.scheep...@vente-exclusive.com> a écrit :
>
> Hey JB,
>
> Thanks for your fast reply.
> The elastic version we're using is 5.6.2.
>
> "version": {
> "number": "5.6.2",
> "build_hash": "57e20f3",
> "build_date": "2017-09-23T13:16:45.703Z",
> "build_snapshot": false,
> "lucene_version": "6.6.1"
> }
>
>
> Wout
>
>
>
> On 10/10/2018, 15:34, "Jean-Baptiste Onofré"  wrote:
>
> Hi Wout,
>
> what's the elasticsearch version ? (just to try to reproduce)
>
> Thanks,
> Regards
> JB
>
> On 10/10/2018 15:31, Wout Scheepers wrote:
> > Hey all,
> >
> >
> >
> > When using .withRetryConfiguration()for ElasticsearchIO, I get the
> > following stacktrace:
> >
> >
> >
> > Caused by:
> com.fasterxml.jackson.databind.exc.MismatchedInputException:
> > No content to map due to end-of-input
> >
> > at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> > column: 0]
> >
> >at
> >
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)
> >
> >
> >
> > I’ve been breaking my head on this one.
> >
> > Apparently the elastic Response object can’t be parsed anymore in the
> > checkForErrors() method.
> >
> > However, it

Re: ElasticIO retry configuration exception

2018-10-11 Thread Wout Scheepers
Hey Romain,

I’ve check and am using the same http client as beam 2.7.0.

Just to be sure, I’ve created a minimal reproducible with a fresh project with 
only the following dependencies in my build.gradle:
dependencies {
compile ('org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0')
compile ('org.apache.beam:beam-runners-direct-java:2.7.0')
compile ('org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0')
compile ('org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0')
compile 
('org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-common:2.7.0')
compile ('org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-jdbc:2.7.0')


testCompile 'org.hamcrest:hamcrest-all:1.3'
testCompile 'org.assertj:assertj-core:3.4.1'
testCompile 'junit:junit:4.12'
}


However, the problem still persists when writing a document to elastic with the 
retryConfiguration set.
I guess the problem lies at my elastic version, as JB implies?

Anyway, thanks for the suggestion.

Wout

From: Romain Manni-Bucau 
Reply-To: "user@beam.apache.org" 
Date: Wednesday, 10 October 2018 at 16:53
To: "user@beam.apache.org" 
Subject: Re: ElasticIO retry configuration exception

Hi Wout,

Maye check your classpath http client versions (against 
https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
 for instance).

Romain Manni-Bucau
@rmannibucau |  
Blog | Old 
Blog | Github 
| LinkedIn | 
Book


Le mer. 10 oct. 2018 à 15:37, Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
a écrit :
Hey JB,

Thanks for your fast reply.
The elastic version we're using is 5.6.2.

"version": {
"number": "5.6.2",
"build_hash": "57e20f3",
"build_date": "2017-09-23T13:16:45.703Z",
"build_snapshot": false,
"lucene_version": "6.6.1"
}


Wout



On 10/10/2018, 15:34, "Jean-Baptiste Onofré" 
mailto:j...@nanthrax.net>> wrote:

Hi Wout,

what's the elasticsearch version ? (just to try to reproduce)

Thanks,
Regards
JB

On 10/10/2018 15:31, Wout Scheepers wrote:
> Hey all,
>
>
>
> When using .withRetryConfiguration()for ElasticsearchIO, I get the
> following stacktrace:
>
>
>
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
>
> at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> column: 0]
>
>at
> 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
>
>at
> 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
>
>at
> 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
>
>at
> 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
>
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
>
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
>
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
>
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)
>
>
>
> I’ve been breaking my head on this one.
>
> Apparently the elastic Response object can’t be parsed anymore in the
> checkForErrors() method.
>
> However, it is parsed successfully in the default RetryPredicate’s test
> method, which is called in flushBatch() in the if clause related to the
> retryConfig (ElasticsearchIO:1201).
>
> As far as I know, the Response object is not altered.
>
>
>
> Any clues why this doesn’t work for me?
>
> I really need this feature, as inserting 40M documents into elastic
> results in too many retry timeouts ☺.
>
>
>
> Thanks!
> Wout
>
>
>
>
>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com