Or perhaps you have a PCollection<String> or something like that, and you
want to use those strings to issue queries to Spanner?
PCollection<String> myStrings = p.apply(.....)
PCollection<Struct> rows = myStrings.apply(
SpannerIO.read()
.withInstanceId(instanceId)
.withDatabaseId(dbId)
.withQuery("SELECT id, name, email FROM users WHERE column = %input%"));
Something like that perhaps?
If that's the case, it looks like you can use SpanerIO.readAll(),
where the input PCollection contains ReadOperations. Something like
this:
myStrings.apply(MapElements.into(TypeDescriptor.of(ReadOperation).via(myString
-> ReadOperation.with...())
.apply(SpannerIO.readAll()
.withInstanceId(instanceId)
.withDatabaseId(dbId));
You'd have to convert your upstream PCollection into a PCollection of
ReadOperations, and then pass that to SpanerIO.readAll().
On Wed, Dec 18, 2019 at 8:53 AM Luke Cwik <[email protected]> wrote:
> How do you want to use the previous data in the SpannerIO.read()?
>
> Are you trying to perform a join on a key between two PCollections? If so,
> please use CoGroupByKey[1].
> Are you trying to merge two PCollection<Struct> objects? If so, please use
> Flatten[2].
>
> 1: https://beam.apache.org/documentation/programming-guide/#cogroupbykey
> 2: https://beam.apache.org/documentation/programming-guide/#flatten
>
> On Wed, Dec 18, 2019 at 8:44 AM Ajit Soman <[email protected]>
> wrote:
>
>> Hi,
>>
>> I am creating a pipeline . I want to execute Spanner query once I got
>> data from its previous stage.
>>
>> In java docs, they have given reference for this code.
>>
>> PCollection<Struct> rows = pipleline.apply(
>> SpannerIO.read()
>> .withInstanceId(instanceId)
>> .withDatabaseId(dbId)
>> .withQuery("SELECT id, name, email FROM users"));
>>
>>
>> *In the above code they have applied SpannerIO query to pipeline object
>> .But i want this to apply in PCollection.apply() method so that i can use
>> its previous stage output in my query.*
>>
>> PCollection<Struct> rows = (PCollection)pCollection.apply(
>> SpannerIO.read()
>> .withInstanceId(instanceId)
>> .withDatabaseId(dbId)
>> .withQuery("SELECT id, name, email FROM users"));
>>
>>
>> Need your help.
>>
>> Thanks & Regards,
>> Ajit
>>
>>
>>
>> ------------------------------
>> DISCLAIMER: This email message along with any attachments may contain
>> information that is confidential or privileged. If you are not the intended
>> recipient or responsible for delivering any of this transmission to an
>> intended recipient, you are hereby notified that any dissemination,
>> distribution, retention, copying or other use of this message or its
>> attachments is prohibited. If you received this message in error, please
>> notify the sender immediately and permanently delete all copies of this
>> message and attachments. No representation is made that this email is
>> free of viruses. Virus scanning is recommended and is the responsibility of
>> the recipient. Thank you.
>
>