On 24 Jan 2020, at 21:54, Kenneth Knowles <[email protected]
<mailto:[email protected]>> wrote:
Would it make sense to have different version-specialized
connectors with a common core library and common API package?
On Fri, Jan 24, 2020 at 11:52 AM Chamikara Jayalath
<[email protected] <mailto:[email protected]>> wrote:
Thanks for the contribution. I agree with Alexey that we should
try to add any new features brought in with the new PR into
existing connector instead of trying to maintain two
implementations.
Thanks,
Cham
On Fri, Jan 24, 2020 at 9:01 AM Alexey Romanenko
<[email protected] <mailto:[email protected]>> wrote:
Hi Ludovic,
Thank you for working on this and sharing the details with
us. This is really great job!
As I recall, we already have some support of Elasticsearch7
in current ElasticsearchIO (afaik, at least they are
compatible), thanks to Zhong Chen and Etienne Chauchot, who
were working on adding this [1][2] and it should be
released in Beam 2.19.
Would you think you can leverage this in your work on
adding new Elasticsearch7 features? IMHO, supporting two
different related IOs can be quite tough task and I‘d
rather raise my hand to add a new functionality into
existing IO than creating a new one, if it’s possible.
[1] https://issues.apache.org/jira/browse/BEAM-5192
[2] https://github.com/apache/beam/pull/10433
On 22 Jan 2020, at 19:23, Ludovic Boutros
<[email protected] <mailto:[email protected]>> wrote:
Dear all,
I have written a completely reworked Elasticsearch 7+ IO
module.
It can be found here:
https://github.com/ludovic-boutros/beam/tree/fresh-reworked-elasticsearch-io-v7/sdks/java/io/elasticsearch7
This is a quite advance WIP work but I'm a quite new user
of Apache Beam and I would like to get some help on this :)
I can create a JIRA issue now but I prefer to wait for
your wise avises first.
_Why a new module ?_
The current module was compliant with Elasticsearch 2.x,
5.x and 6.x. This seems to be a good point but so many
things have been changed since Elasticsearch 2.x.
Probably this is not correct anymore due to
https://github.com/apache/beam/pull/10433 ?
Elasticsearch 7.x is now partially supported (document
type are removed, occ, updates...).
A fresh new module, only compliant with the last version
of Elasticsearch, can easily benefit a lot from the last
evolutions of Elasticsearch (Java High Level Http Client).
It is therefore far simpler than the current one.
_Error management_
Currently, errors are caught and transformed into simple
exceptions. This is not always what is needed. If we would
like to do specific processing on these errors (send
documents in error topics for instance), it is not
possible with the current module.
Seems like this is some sort of a dead letter queue
implementation.. This will be a very good feature to add to the
existing connector.
_Philosophy_
This module directly uses the Elasticsearch Java client
classes as inputs and outputs.
This way you can configure any options you need directly
in the `DocWriteRequest` objects.
For instance:
- If you need to use external versioning
(https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning),
you can.
- If you need to use an ingest pipelines, you can.
- If you need to configure an update document/script, you can.
- If you need to use upserts, you can.
Actually, you should be able to do everything you can do
directly with Elasticsearch.
Furthermore, it should be easier to keep updating the
module with future Elasticsearch evolutions.
_Write outputs_
Two outputs are available:
- Successful indexing output ;
- Failed indexing output.
They are available in a `WriteResult` object.
These two outputs are represented by
`PCollection<BulkItemResponseContainer>` objects.
A `BulkItemResponseContainer` contains:
- the original index request ;
- the Elasticsearch response ;
- a batch id.
You can apply any process afterwards (reprocessing,
alerting, ...).
_Read input_
You can read documents from Elasticsearch with this module.
You can specify a `QueryBuilder` in order to filter the
retrieved documents.
By default, it retrieves the whole document collection.
If the Elasticsearch index is sharded, multiple slices can
be used during fetch. That many bundles are created. The
maximum bundle count is equal to the index shard count.
Thank you !
Ludovic