[ 
https://issues.apache.org/jira/browse/BEAM-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798955#comment-16798955
 ] 

Ismaël Mejía commented on BEAM-6886:
------------------------------------

This does not look that simple because the size of the bundle is decided by the 
runner, and even the option to 'modify'  this size for batched operations does 
not allow to have batches bigger than the batch size (Notice that batched 
operations is a subject where there is not consensus in the community and has 
been contested multiple times in the mailing lists/other JIRAs).

I am however curious about your solution, because this means that you can hack 
the defaults, by using State APIs (the core of GroupIntoBatches) and I am not 
sure if this guarantees correct results on retry.

Any opinion on this [~kenn]?

> Change batch handling in ElasticsearchIO to avoid necessity for 
> GroupIntoBatches
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-6886
>                 URL: https://issues.apache.org/jira/browse/BEAM-6886
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>    Affects Versions: 2.11.0
>            Reporter: Egbert
>            Priority: Major
>
> I have a streaming job inserting records into an Elasticsearch cluster. I set 
> the batch size appropriately big, but I found out this is not causing any 
> effect at all: I found that all elements are inserted in batches of 1 or 2 
> elements.
> The reason seems to be that this is a streaming pipeline, which may result in 
> tiny bundles. Since ElasticsearchIO uses `@FinishBundle` to flush a batch, 
> this will result in equally small batches.
> This results in a huge amount of bulk requests with just one element, 
> grinding the Elasticsearch cluster to a halt.
> I have now been able to work around this by using a `GroupIntoBatches` 
> operation before the insert, but this results in 3 steps (mapping to a key, 
> applying GroupIntoBatches, stripping key and outputting all collected 
> elements), making the process quite awkward.
> A much better approach would be to internalize this into the ElasticsearchIO 
> write transform.. Use a timer that flushes the batch at batch size or end of 
> window, not at the end of a bundle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to