I’m not able to find any implementation of ‘SplitableDoFn”.  All reference I 
can find are of “Splitable DoFn”, so could you point me in the right version of 
the Apache Beam SDK that would have this?   Thanks, ~Sean

From: Evan Galpin <[email protected]>
Date: Wednesday, April 19, 2023 at 4:46 PM
To: [email protected] <[email protected]>
Cc: Murphy, Sean P. <[email protected]>
Subject: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), 
which expects a PBegin input and a means to handle a collection of queries
Yes unfortunately the ES IO connector is not built in a way that can work by 
taking inputs from a PCollection to issue Reads. The most scalable way to 
support this is to revisit the implementation of Elasticsearch Read transform 
and instead implement it as a SplittableDoFn.

On Wed, Apr 19, 2023 at 10:51 Shahar Frank 
<[email protected]<mailto:[email protected]>> wrote:
Hi Sean,

I'm not an expert but I think the .withQuery() functions takes part of the 
build stage rather than the runtime stage.
This means that the way ElasticsearchIO was built is so that while the pipeline 
is being built you could set the query but it is not possible during runtime 
which mean you cannot dynamically run the query based on the element processed 
within the pipeline.

To do something like that the transformation must be designed more like the 
FileIO in this example: (From 
https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/FileIO.html)

 PCollection<KV<String, String>> filesAndContents = p

     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))

     // withCompression can be omitted - by default compression is detected 
from the filename.

     .apply(FileIO.readMatches().withCompression(GZIP))

     .apply(MapElements

         // uses imports from TypeDescriptors

         .into(kvs(strings(), strings()))

         .via((ReadableFile f) -> {

           try {

             return KV.of(

                 f.getMetadata().resourceId().toString(), 
f.readFullyAsUTF8String());

           } catch (IOException ex) {

             throw new RuntimeException("Failed to read the file", ex);

           }

         }));

If you look at how FileIO.readMatches() works - it doesn't set the filename 
when building the pipeline but rather accepts that within the ProcessElement 
function.

See 
here<https://github.com/apache/beam/blob/bd8950176db0116221a1b739a3916da26d822f2f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L873>

Does that make sense?

Cheers,
Shahar.
________________________________

Shahar Frank

[email protected]<mailto:[email protected]>

+447799561438

________________________________





On Wed, 19 Apr 2023 at 18:05, Murphy, Sean P. via user 
<[email protected]<mailto:[email protected]>> wrote:
I'm running into an issue using the ElasticsearchIO.read() to handle more than 
one instance of a query. My queries are being dynamically built as a 
PCollection based on an incoming group of values. I'm trying to see how to load 
the .withQuery() parameter which could provide this capability or any approach 
that provides flexibility.

The issue is that ElasticsearchIO.read() method expects a PBegin input to start 
a pipeline, but it seems like I need access outside of a pipeline context 
somehow. PBegin represents the beginning of a pipeline, and it's required to 
create a pipeline that can read data from Elasticsearch using 
IOElasticsearchIO.read().

Can I wrap the ElasticsearchIO.read() call in a Create transform that creates a 
PCollection with a single element (e.g., PBegin) to simulate the beginning of a 
pipeline or something similar?

Here is my naive attempt without accepting the reality of PBegin:
   PCollection<String> queries = ... // a PCollection of Elasticsearch queries

    PCollection<String> queryResults = queries.apply(
        ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String query = c.element();
                PCollection<String> results = c.pipeline()
                    .apply(ElasticsearchIO.read()
                        .withConnectionConfiguration(
                            
ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
                        .withQuery(query));
                c.output(results);
            }
        })
    .apply(Flatten.pCollections()));


In general I'm wondering for any of IO-related classes proved by Beam that 
conforms to PBegin input -- if there is a means to introduce a collection.

Here is one approach that might be promising:
// Define a ValueProvider for a List<String>
ValueProvider<List<String>> myListProvider = 
ValueProvider.StaticValueProvider.of(myList);

// Use the ValueProvider to create a PCollection of Strings
PCollection<String> pcoll = pipeline.apply(Create.ofProvider(myListProvider, 
ListCoder.of()));

PCollection<String> partitionData = PBegin.in(pipeline)
        .apply("Read data from Elasticsearch", 
ElasticsearchIO.read().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String>
 pcoll).withScrollKeepalive("1m").withBatchSize(50))
        .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(), 
opt.getMedTagVersion(), opt.getNoteType()));

Any thoughts or ideas would be great.   Thanks, ~Sean

Reply via email to