Re: ElasticIO retry configuration exception

2018-10-11 Thread Wout Scheepers
Hey Tim, Romain,

I created the ticket (BEAM-5725. I’ll try to fix it, as it’s time I made my 
first PR.
First will focus on getting a reproducible in a unit test.

Thanks!
Wout



From: Tim Robertson 
Reply-To: "user@beam.apache.org" 
Date: Thursday, 11 October 2018 at 20:25
To: "user@beam.apache.org" 
Subject: Re: ElasticIO retry configuration exception

I took a super quick look at the code and I think Romain is correct.

1. On a retry scenario it calls handleRetry()
2. Within handleRetry() it gets the DefaultRetryPredicate and calls 
test(response) - this reads the response stream to JSON
3. When the retry is successful (no 429 code) the response is returned
4. The response is then passed in to checkForErrors(...)
5. This then tried to parse the response by reading the response stream. It was 
already read in step 2

Can you please open a Jira for this Wout? 
https://issues.apache.org/jira/projects/BEAM/issues
If you don't have an account I'll create it.

This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so 
before released in 2.9.0.
However as soon as it is fixed it is fairly easy to bring into your own 
project, by copying in the single ElasticsearchIO.java declared in the same 
package.

Thank you for reporting the issue,
Tim




On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau 
mailto:rmannibu...@gmail.com>> wrote:
It looks more like a client issue where the stream is already read, maybe give 
a try to reproduce it in a unit test in beam ES module? This will enable us to 
help you more accurately.

Romain Manni-Bucau
@rmannibucau<https://twitter.com/rmannibucau> |  
Blog<https://rmannibucau.metawerx.net/> | Old 
Blog<http://rmannibucau.wordpress.com> | Github<https://github.com/rmannibucau> 
| LinkedIn<https://www.linkedin.com/in/rmannibucau> | 
Book<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
a écrit :
Hey Romain,

I’ve check and am using the same http client as beam 2.7.0.

Just to be sure, I’ve created a minimal reproducible with a fresh project with 
only the following dependencies in my build.gradle:
dependencies {
compile ('org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0')
compile ('org.apache.beam:beam-runners-direct-java:2.7.0')
compile ('org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0')
compile ('org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0')
compile 
('org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-common:2.7.0')
compile ('org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-jdbc:2.7.0')


testCompile 'org.hamcrest:hamcrest-all:1.3'
testCompile 'org.assertj:assertj-core:3.4.1'
testCompile 'junit:junit:4.12'
}


However, the problem still persists when writing a document to elastic with the 
retryConfiguration set.
I guess the problem lies at my elastic version, as JB implies?

Anyway, thanks for the suggestion.

Wout

From: Romain Manni-Bucau mailto:rmannibu...@gmail.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>
Date: Wednesday, 10 October 2018 at 16:53
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>
Subject: Re: ElasticIO retry configuration exception

Hi Wout,

Maye check your classpath http client versions (against 
https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
 for instance).

Romain Manni-Bucau
@rmannibucau<https://twitter.com/rmannibucau> |  
Blog<https://rmannibucau.metawerx.net/> | Old 
Blog<http://rmannibucau.wordpress.com> | Github<https://github.com/rmannibucau> 
| LinkedIn<https://www.linkedin.com/in/rmannibucau> | 
Book<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le mer. 10 oct. 2018 à 15:37, Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
a écrit :
Hey JB,

Thanks for your fast reply.
The elastic version we're using is 5.6.2.

"version": {
"number": "5.6.2",
"build_hash": "57e20f3",
    "build_date": "2017-09-23T13:16:45.703Z",
"build_snapshot": false,
"lucene_version": "6.6.1"
}


Wout



On 10/10/2018, 15:34, "Jean-Baptiste Onofré" 
mailto:j...@nanthrax.net>> wrote:

Hi Wout,

what's the elasticsear

Re: ElasticIO retry configuration exception

2018-10-11 Thread Wout Scheepers
Hey Romain,

I’ve check and am using the same http client as beam 2.7.0.

Just to be sure, I’ve created a minimal reproducible with a fresh project with 
only the following dependencies in my build.gradle:
dependencies {
compile ('org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0')
compile ('org.apache.beam:beam-runners-direct-java:2.7.0')
compile ('org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0')
compile ('org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0')
compile 
('org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-common:2.7.0')
compile ('org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-jdbc:2.7.0')


testCompile 'org.hamcrest:hamcrest-all:1.3'
testCompile 'org.assertj:assertj-core:3.4.1'
testCompile 'junit:junit:4.12'
}


However, the problem still persists when writing a document to elastic with the 
retryConfiguration set.
I guess the problem lies at my elastic version, as JB implies?

Anyway, thanks for the suggestion.

Wout

From: Romain Manni-Bucau 
Reply-To: "user@beam.apache.org" 
Date: Wednesday, 10 October 2018 at 16:53
To: "user@beam.apache.org" 
Subject: Re: ElasticIO retry configuration exception

Hi Wout,

Maye check your classpath http client versions (against 
https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
 for instance).

Romain Manni-Bucau
@rmannibucau<https://twitter.com/rmannibucau> |  
Blog<https://rmannibucau.metawerx.net/> | Old 
Blog<http://rmannibucau.wordpress.com> | Github<https://github.com/rmannibucau> 
| LinkedIn<https://www.linkedin.com/in/rmannibucau> | 
Book<https://www.packtpub.com/application-development/java-ee-8-high-performance>


Le mer. 10 oct. 2018 à 15:37, Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
a écrit :
Hey JB,

Thanks for your fast reply.
The elastic version we're using is 5.6.2.

"version": {
"number": "5.6.2",
"build_hash": "57e20f3",
"build_date": "2017-09-23T13:16:45.703Z",
"build_snapshot": false,
    "lucene_version": "6.6.1"
}


Wout



On 10/10/2018, 15:34, "Jean-Baptiste Onofré" 
mailto:j...@nanthrax.net>> wrote:

Hi Wout,

what's the elasticsearch version ? (just to try to reproduce)

Thanks,
Regards
JB

On 10/10/2018 15:31, Wout Scheepers wrote:
> Hey all,
>
>
>
> When using .withRetryConfiguration()for ElasticsearchIO, I get the
> following stacktrace:
>
>
>
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
>
> at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> column: 0]
>
>at
> 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
>
>at
> 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
>
>at
> 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
>
>at
> 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
>
>at
> 
org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
>
>at
> 
org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
>
>at
> 
org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
>
>at
> 
org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)
>
>
>
> I’ve been breaking my head on this one.
>
> Apparently the elastic Response object can’t be parsed anymore in the
> checkForErrors() method.
>
> However, it is parsed successfully in the default RetryPredicate’s test
> method, which is called in flushBatch() in the if clause related to the
> retryConfig (ElasticsearchIO:1201).
>
> As far as I know, the Response object is not altered.
>
>
>
> Any clues why this doesn’t work for me?
>
> I really need this feature, as inserting 40M documents into elastic
> results in too many retry timeouts ☺.
>
>
>
> Thanks!
> Wout
>
>
>
>
>

--
Jean-Baptiste Onofré
jbono...@apache.org<mailto:jbono...@apache.org>
http://blog.nanthrax.net
Talend - http://www.talend.com



Re: ElasticIO retry configuration exception

2018-10-10 Thread Wout Scheepers
Hey JB,

Thanks for your fast reply.
The elastic version we're using is 5.6.2.

"version": {
"number": "5.6.2",
"build_hash": "57e20f3",
"build_date": "2017-09-23T13:16:45.703Z",
"build_snapshot": false,
"lucene_version": "6.6.1"
}


Wout



On 10/10/2018, 15:34, "Jean-Baptiste Onofré"  wrote:

Hi Wout,

what's the elasticsearch version ? (just to try to reproduce)

Thanks,
Regards
JB

On 10/10/2018 15:31, Wout Scheepers wrote:
> Hey all,
> 
>  
> 
> When using .withRetryConfiguration()for ElasticsearchIO, I get the
> following stacktrace:
> 
>  
> 
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
> 
> at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> column: 0]
> 
>at
> 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> 
>at
> 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> 
>at
> 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> 
>at
> 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
> 
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
> 
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
> 
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
> 
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)
> 
>  
> 
> I’ve been breaking my head on this one.
> 
> Apparently the elastic Response object can’t be parsed anymore in the
> checkForErrors() method.
> 
> However, it is parsed successfully in the default RetryPredicate’s test
> method, which is called in flushBatch() in the if clause related to the
> retryConfig (ElasticsearchIO:1201).
> 
> As far as I know, the Response object is not altered.
> 
>  
> 
> Any clues why this doesn’t work for me?
> 
> I really need this feature, as inserting 40M documents into elastic
> results in too many retry timeouts ☺.
> 
>  
> 
> Thanks!
> Wout
> 
>  
> 
>  
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com




ElasticIO retry configuration exception

2018-10-10 Thread Wout Scheepers
Hey all,

When using .withRetryConfiguration()for ElasticsearchIO, I get the following 
stacktrace:

Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No 
content to map due to end-of-input
at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1, column: 0]
   at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
   at 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
   at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
   at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
   at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
   at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
   at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
   at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)

I’ve been breaking my head on this one.
Apparently the elastic Response object can’t be parsed anymore in the 
checkForErrors() method.
However, it is parsed successfully in the default RetryPredicate’s test method, 
which is called in flushBatch() in the if clause related to the retryConfig 
(ElasticsearchIO:1201).
As far as I know, the Response object is not altered.

Any clues why this doesn’t work for me?
I really need this feature, as inserting 40M documents into elastic results in 
too many retry timeouts ☺.

Thanks!
Wout




Re: How to use of BigQueryIO Method.FILE_LOADS when reading from a unbounded source?

2018-10-10 Thread Wout Scheepers
Hey Tobias,

org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)



points to the following code snippet (starting from BatchLoads.java:210) :

if (bigQueryServices == null) {
  try {
GcsPath.fromUri(tempLocation);
  } catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
"BigQuery temp location expected a valid 'gs://' path, but was 
given '%s'",
tempLocation),
e);
  }
}



are you sure your templocation is set correctly? I guess it’s needed for 
staging a bigquery load job instead of streaming.



Wout



From: "Kaymak, Tobias" 
Reply-To: "user@beam.apache.org" 
Date: Wednesday, 10 October 2018 at 14:18
To: "user@beam.apache.org" 
Subject: How to use of BigQueryIO Method.FILE_LOADS when reading from a 
unbounded source?

I am trying to read from an unbounded source and using FILE_LOADS instead of 
streaming inserts towards BigQuery.

If I don't have the following two lines

 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
  .withTriggeringFrequency(Duration.standardMinutes(10))

my code works just fine, but uses streaming inserts. If I add them I get a 
non-specific stacktrace like:

Exception in thread "main" java.lang.IllegalArgumentException
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
at 
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)

where line 181 is the first line of the following code excerpt:

BigQueryIO.write()
  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
  .withTriggeringFrequency(Duration.standardMinutes(10))

  .to(
  new DynamicDestinations() {
@Override
public String getDestination(ValueInSingleWindow element) {
  return element.getValue().getTopic();
}

@Override
public TableDestination getTable(String destination) {
  return new TableDestination(
  "charged-dialect-824:KafkaStaging.test", null, new 
TimePartitioning().setType("DAY"));
}

@Override
public TableSchema getSchema(String destination) {
  return inputMessagesConfig.getTableSchema(destination);
}
  })
  .withFormatFunction(
  (SerializableFunction)
  event -> convertUserEventToTableRow(event))
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

I am not sure what I am doing wrong here, I tried higher values for the 
Duration, but that didn't help. I wasn't able to find the root
cause for the exception with the debugger, any idea how I can get to the bottom 
of this?

Tobi



Re: How to update side inputs.

2018-09-06 Thread Wout Scheepers
Hi Lukasz,

Thanks for your help, Bart and I got the side-input windowing issues figured 
out.

Next to this, I noticed that when streaming updates into a side-input map that 
are already present, it will result in an exception being thrown saying there 
are duplicates values [1].
By this, I assume the new KV are just added to the map without 
any check if the map already contains them. Only at the point where we do a get 
on the key, the exception is thrown.
Shouldn’t there be a check when a new entry is added?

I’ve created a workaround for the issue by adding an extra ParDo with a map as 
cache:

public static class MapFilter extends DoFn, KV> {

private HashMap cache;

public MapFilter() {
this.cache = new HashMap<>();
}

@ProcessElement
public void processElement(ProcessContext c) {
if (!cache.containsKey(c.element().getKey())) {
cache.put(c.element().getKey(), c.element().getValue());
c.output(c.element());
}
}
}

PCollectionView> mapping = ticks
  .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and 
returns a String
  .apply(ParDo.of(new MapFn()))// turns the String (json 
data) into a Map (KV)
  .apply(ParDo.of(new MapFilter()))
  .apply(Window.>into(new GlobalWindows())
...

Any thoughts on this? I’ve noticed that the code that throws the exception is 
written by you (PCollectionViews.class:326)

Thanks!
Wout

[1] java.lang.RuntimeException: Exception while fetching side input:

com.google.cloud.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:195)

com.google.cloud.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:285)

com.google.cloud.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:69)

com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:632)

com.google.cloud.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:683)
   
com.google.cloud.dataflow.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:130)

com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:52)

com.google.cloud.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:300)

com.google.cloud.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:226)

com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:35)

com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)

com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)

com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)

com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: 
com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: Duplicate values for 
feature_option#4ff36b6789000225

From: Lukasz Cwik 
Reply-To: "user@beam.apache.org" 
Date: Tuesday, 4 September 2018 at 22:57
To: "user@beam.apache.org" 
Subject: Re: How to update side inputs.

Sorry,

* If this is ok for you, then this is likely the easiest solution but if not 
you need to guarantee that the computations are happening with the updated side 
input data, you'll need to modify your triggers and pipeline to be based upon 
watermarks (suggestion #2).


On Tue, Sep 4, 2018 at 1:49 PM Lukasz Cwik 
mailto:lc...@google.com>> wrote:
Bart, the error you are hitting is because the other part of the pipeline is 
operating on a global window.

Every time a side input is looked up in the DoFn, the main window (global 
window in your case) is mapped onto the side input window (a fixed window). 
There is no logical mapping from global window to fixed window that makes sense 
(you could write your own window mapping function though but it must be 
deterministic which isn't very useful for what your trying to do). You'll want 
to either:
1) update your side input to produce results in the global window
2) modify your pipeline so the other part is in something that is compatible 
with fixed windows.

For 1, consider

PCollection ticks = p

  // Produce 1 "tick" per 10 seconds

  .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10)))
  // Window the ticks into 1-minute windows
  .apply(Window.into(FixedWindows.of(Durat

BigqueryIO field clustering

2018-08-29 Thread Wout Scheepers
Hey all,

I’m trying to use the field clustering beta feature in bigquery [1].
However, the current Beam/dataflow worker bigquery api service dependency is 
‘google-api-services-bigquery: com.google.apis: v2-rev374-1.23.0’, which does 
not include the clustering option in the TimePartitioning class.
Hereby, I can’t specify the clustering field when loading/streaming into 
bigquery. See [2] for the bigquery api error details.

Does anyone know a workaround for this?

I guess that in the worst case I’ll have to wait until Beam supports a newer 
version of the bigquery api service.
1.After checking the Beam Jira I’ve found 
BEAM-5191. Is there any way I 
can help to push this forward and make this feature possible in the near future?

Thanks in advance,
Wout

[1] https://cloud.google.com/bigquery/docs/clustered-tables
[2] "errorResult" : {
  "message" : "Incompatible table partitioning specification. Expects 
partitioning specification interval(type:day,field:publish_time) 
clustering(clustering_id), but input partitioning specification is 
interval(type:day,field:publish_time)",
  "reason" : "invalid"
}


Re: ElasticsearchIO bulk delete

2018-07-30 Thread Wout Scheepers
Hey Tim,

Thanks for your proposal to mentor me through my first PR.
As we’re definitely planning to upgrade to ES6 when Beam supports it, we 
decided to postpone the feature (we have a fix that works for us, for now).
When Beam supports ES6, I’ll be happy to make a contribution to get bulk 
deletes working.

For reference, I opened a ticket 
(https://issues.apache.org/jira/browse/BEAM-5042).

Cheers,
Wout


From: Tim Robertson 
Reply-To: "user@beam.apache.org" 
Date: Friday, 27 July 2018 at 17:43
To: "user@beam.apache.org" 
Subject: Re: ElasticsearchIO bulk delete

Hi Wout,

This is great, thank you. I wrote the partial update support you reference and 
I'll be happy to mentor you through your first PR - welcome aboard. Can you 
please open a Jira to reference this work and we'll assign it to you?

We discussed having the "_xxx" fields in the document and triggering actions 
based on that in the partial update jira but opted to avoid it. Based on that 
discussion the ActionFn would likely be the preferred approach.  Would that be 
possible?

It will be important to provide unit and integration tests as well.

Please be aware that there is a branch and work underway for ES6 already which 
is rather different on the write() path so this may become redundant rather 
quickly.

Thanks,
Tim

@timrobertson100 on the Beam slack channel



On Fri, Jul 27, 2018 at 2:53 PM, Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
wrote:
Hey all,

A while ago, I patched ElasticsearchIO to be able to do partial updates and 
deletes.
However, I did not consider my patch pull-request-worthy as the json parsing 
was done inefficient (parsed it twice per document).

Since Beam 2.5.0 partial updates are supported, so the only thing I’m missing 
is the ability to send bulk delete requests.
We’re using entity updates for event sourcing in our data lake and need to 
persist deleted entities in elastic.
We’ve been using my patch in production for the last year, but I would like to 
contribute to get the functionality we need into one of the next releases.

I’ve created a gist that works for me, but is still inefficient (parsing twice: 
once to check the ‘_action` field, once to get the metadata).
Each document I want to delete needs an additional ‘_action’ field with the 
value ‘delete’. It doesn’t matter the document still contains the redundant 
field, as the delete action only requires the metadata.
I’ve added the method isDelete() and made some changes to the processElement() 
method.
https://gist.github.com/wscheep/26cca4bda0145ffd38faf7efaf2c21b9

I would like to make my solution more generic to fit into the current 
ElasticsearchIO and create a proper pull request.
As this would be my first pull request for beam, can anyone point me in the 
right direction before I spent too much time creating something that will be 
rejected?

Some questions on the top of my mind are:

  *   Is it a good idea it to make the ‘action’ part for the bulk api generic?
  *   Should it be even more generic? (e.g.: set an ‘ActionFn’ on the 
ElasticsearchIO)
  *   If I want to avoid parsing twice, the parsing should be done outside of 
the getDocumentMetaData() method. Would this be acceptable?
  *   Is it possible to avoid passing the action as a field in the document?
  *   Is there another or better way to get the delete functionality in general?

All feedback is more than welcome.

Cheers,
Wout





ElasticsearchIO bulk delete

2018-07-27 Thread Wout Scheepers
Hey all,

A while ago, I patched ElasticsearchIO to be able to do partial updates and 
deletes.
However, I did not consider my patch pull-request-worthy as the json parsing 
was done inefficient (parsed it twice per document).

Since Beam 2.5.0 partial updates are supported, so the only thing I’m missing 
is the ability to send bulk delete requests.
We’re using entity updates for event sourcing in our data lake and need to 
persist deleted entities in elastic.
We’ve been using my patch in production for the last year, but I would like to 
contribute to get the functionality we need into one of the next releases.

I’ve created a gist that works for me, but is still inefficient (parsing twice: 
once to check the ‘_action` field, once to get the metadata).
Each document I want to delete needs an additional ‘_action’ field with the 
value ‘delete’. It doesn’t matter the document still contains the redundant 
field, as the delete action only requires the metadata.
I’ve added the method isDelete() and made some changes to the processElement() 
method.
https://gist.github.com/wscheep/26cca4bda0145ffd38faf7efaf2c21b9

I would like to make my solution more generic to fit into the current 
ElasticsearchIO and create a proper pull request.
As this would be my first pull request for beam, can anyone point me in the 
right direction before I spent too much time creating something that will be 
rejected?

Some questions on the top of my mind are:

  *   Is it a good idea it to make the ‘action’ part for the bulk api generic?
  *   Should it be even more generic? (e.g.: set an ‘ActionFn’ on the 
ElasticsearchIO)
  *   If I want to avoid parsing twice, the parsing should be done outside of 
the getDocumentMetaData() method. Would this be acceptable?
  *   Is it possible to avoid passing the action as a field in the document?
  *   Is there another or better way to get the delete functionality in general?

All feedback is more than welcome.

Cheers,
Wout




Streaming MutationGroups with SpannerIO

2018-07-23 Thread Wout Scheepers
Hey all,

I ran into some problems trying to stream MutationGroups into spanner.
I tried to document the things I’ve tried so far as clearly as possible in the 
following SO post:
https://stackoverflow.com/questions/51480770/streaming-mutationgroups-into-spanner

Hope not everyone is having fun at Next this week and someone can help me out 😉

Thanks!
- Wout

Wout Scheepers
Data Engineer
[id:image001.jpg@01D30AD0.6263A850]

Humaniteitslaan 65 • 1601 Ruisbroek • Belgium