[ 
https://issues.apache.org/jira/browse/BEAM-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16801489#comment-16801489
 ] 

Etienne Chauchot commented on BEAM-6886:
----------------------------------------

[~MadEgg] You are referring to "batch size" but you are forgetting the most 
important part of the name of the method which is MAX. It is MAXBatchSize. 
ElasticsearchIO cannot control the size of the batch because it is dependent of 
the bundle size which is the responsibility of the runner to set. Depending on 
what the runner targets  it sets the bundle size accordingly: a streaming job 
(as you supposed correctly) tends to create small bundles to lower latency and 
increased throughput, on the contrary a runner that targets batch jobs, tends 
to create higher size bundles. For example Flink creates one element bundles so 
if you used this runner, you'd get 1 element ES bulks. Regarding the javadoc, 
it is already said in the withMaxBatchSize() javadoc, quote: "Depending on the 
execution engine, size of bundles may vary, this sets the maximum size"

Regarding your proposal of outputting only when the window closes would be 
against beam model (violates the bundle unit of commit) and would lead in batch 
mode to output only at the end of the batch, as, in that mode, all the elements 
are stored in a single window called Global window. 

Regarding your workaround: IMHO GroupIntoBatches is the correct way to go as 
said by [~kenn]. Having to key your PCollection is needed only if you do not 
have a keyed PCollection already (which I suppose you don't). It is because the 
GIB uses state API which is per key and window (otherwise we would have a huge 
state and we could not clean in when a given key is done). IMHO including GIB 
into ESIO seems less composable than allowing the user to chose whether he 
needs it and moreover it would need to be done in all IOs. Nevertheless, what 
can be done to reduce the awkwardness of the pipeline you mention is detecting 
in GIB that the PCollection is not keyed and key it at the beginning and unkey 
it at the end.

[~kenn] [~MadEgg] WDYT ?


> Change batch handling in ElasticsearchIO to avoid necessity for 
> GroupIntoBatches
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-6886
>                 URL: https://issues.apache.org/jira/browse/BEAM-6886
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>    Affects Versions: 2.11.0
>            Reporter: Egbert
>            Priority: Major
>
> I have a streaming job inserting records into an Elasticsearch cluster. I set 
> the batch size appropriately big, but I found out this is not causing any 
> effect at all: I found that all elements are inserted in batches of 1 or 2 
> elements.
> The reason seems to be that this is a streaming pipeline, which may result in 
> tiny bundles. Since ElasticsearchIO uses `@FinishBundle` to flush a batch, 
> this will result in equally small batches.
> This results in a huge amount of bulk requests with just one element, 
> grinding the Elasticsearch cluster to a halt.
> I have now been able to work around this by using a `GroupIntoBatches` 
> operation before the insert, but this results in 3 steps (mapping to a key, 
> applying GroupIntoBatches, stripping key and outputting all collected 
> elements), making the process quite awkward.
> A much better approach would be to internalize this into the ElasticsearchIO 
> write transform.. Use a timer that flushes the batch at batch size or end of 
> window, not at the end of a bundle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to