On 24 Jan 2020, at 21:54, Kenneth Knowles <[email protected]
<mailto:[email protected]>> wrote:
Would it make sense to have different version-specialized connectors
with a common core library and common API package?
On Fri, Jan 24, 2020 at 11:52 AM Chamikara Jayalath
<[email protected] <mailto:[email protected]>> wrote:
Thanks for the contribution. I agree with Alexey that we should
try to add any new features brought in with the new PR into
existing connector instead of trying to maintain two
implementations.
Thanks,
Cham
On Fri, Jan 24, 2020 at 9:01 AM Alexey Romanenko
<[email protected] <mailto:[email protected]>> wrote:
Hi Ludovic,
Thank you for working on this and sharing the details with
us. This is really great job!
As I recall, we already have some support of Elasticsearch7
in current ElasticsearchIO (afaik, at least they are
compatible), thanks to Zhong Chen and Etienne Chauchot, who
were working on adding this [1][2] and it should be released
in Beam 2.19.
Would you think you can leverage this in your work on adding
new Elasticsearch7 features? IMHO, supporting two different
related IOs can be quite tough task and I‘d rather raise my
hand to add a new functionality into existing IO than
creating a new one, if it’s possible.
[1] https://issues.apache.org/jira/browse/BEAM-5192
[2] https://github.com/apache/beam/pull/10433
On 22 Jan 2020, at 19:23, Ludovic Boutros
<[email protected] <mailto:[email protected]>> wrote:
Dear all,
I have written a completely reworked Elasticsearch 7+ IO module.
It can be found here:
https://github.com/ludovic-boutros/beam/tree/fresh-reworked-elasticsearch-io-v7/sdks/java/io/elasticsearch7
This is a quite advance WIP work but I'm a quite new user of
Apache Beam and I would like to get some help on this :)
I can create a JIRA issue now but I prefer to wait for your
wise avises first.
_Why a new module ?_
The current module was compliant with Elasticsearch 2.x, 5.x
and 6.x. This seems to be a good point but so many things
have been changed since Elasticsearch 2.x.
Probably this is not correct anymore due to
https://github.com/apache/beam/pull/10433 ?
Elasticsearch 7.x is now partially supported (document type
are removed, occ, updates...).
A fresh new module, only compliant with the last version of
Elasticsearch, can easily benefit a lot from the last
evolutions of Elasticsearch (Java High Level Http Client).
It is therefore far simpler than the current one.
_Error management_
Currently, errors are caught and transformed into simple
exceptions. This is not always what is needed. If we would
like to do specific processing on these errors (send
documents in error topics for instance), it is not possible
with the current module.
Seems like this is some sort of a dead letter queue
implementation.. This will be a very good feature to add to the
existing connector.
_Philosophy_
This module directly uses the Elasticsearch Java client
classes as inputs and outputs.
This way you can configure any options you need directly in
the `DocWriteRequest` objects.
For instance:
- If you need to use external versioning
(https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning),
you can.
- If you need to use an ingest pipelines, you can.
- If you need to configure an update document/script, you can.
- If you need to use upserts, you can.
Actually, you should be able to do everything you can do
directly with Elasticsearch.
Furthermore, it should be easier to keep updating the module
with future Elasticsearch evolutions.
_Write outputs_
Two outputs are available:
- Successful indexing output ;
- Failed indexing output.
They are available in a `WriteResult` object.
These two outputs are represented by
`PCollection<BulkItemResponseContainer>` objects.
A `BulkItemResponseContainer` contains:
- the original index request ;
- the Elasticsearch response ;
- a batch id.
You can apply any process afterwards (reprocessing,
alerting, ...).
_Read input_
You can read documents from Elasticsearch with this module.
You can specify a `QueryBuilder` in order to filter the
retrieved documents.
By default, it retrieves the whole document collection.
If the Elasticsearch index is sharded, multiple slices can
be used during fetch. That many bundles are created. The
maximum bundle count is equal to the index shard count.
Thank you !
Ludovic