I am trying to read AppEngine search index documents using Google Cloud
Dataflow. I tried raising this before
on the google-cloud-sdk mailinglist but I was told that stackoverflow was a
better place for discussing code.
Unfortunately, as I hope will become obvious this is an awkward issue that
is unlikely to have a straight forward
answer and so would most likely be marked as 'unconstructive' by
stackoverflow moderators. My hope is this
list can give more information on what I guarantees I can expect with
regards to threading from the pipeline runner.

I am reading the AppEngine search indexes this the remote api, which is the
only way to access the service
via Java outside of AppEngine. One awkward aspect of the remote api is that
it forces you to install a proxy stub
in what seems to be some kind of thread local variable. I include a simple
Reader and Source for a single search
index at the end of this email.

The obvious issue is that 'installing' the remote api impacts the
reusability and composability of this source.
I've made a somewhat futile attempt to detect another remote api
'installation' via reflection to prevent an
exception being thrown and continue hoping that whatever remote api proxy
is present is the right one!

I've since begun extending this code to read from several indexes at once
and the problem just gets worse
since I now need to make remote api calls the get the list of indexes in
the createReader method of the
source.

It seems to me that it would be nice to have some kind of worker 'new
thread hook' where thread local variables
can be set up. Does this exist? This would let me decouple remote api proxy
'installation' from source and reader
itself.

Thanks,

Frank

***************
Sample code
***************

class SearchIndexReader extends BoundedSource.BoundedReader<Document> {

    private Document current;
    private RemoteApiInstaller installer;
    private SearchIndexSource source;
    private Iterator<Document> documentIterator;
    boolean cleanupApiInstallation;

    public SearchIndexReader(SearchIndexSource source) {
        this.source = source;
        cleanupApiInstallation = false;
    }

    @Override
    public boolean start() throws IOException {
        RemoteApiOptions options = new RemoteApiOptions()
                .server("myProjectId.appspot.com", 443)
                .useApplicationDefaultCredential()
                .remoteApiPath("/_ah/remote_api/");

        installer = new RemoteApiInstaller();

        try { // XXX is accessing this private method justified?
            Method installed =
installer.getClass().getDeclaredMethod("installed");

            installed.setAccessible(true);

            if (!(Boolean) installed.invoke(installer)) {
                installer.install(options);
                cleanupApiInstallation = true;
            }
        } catch
(NoSuchMethodException|IllegalAccessException|InvocationTargetException e) {
            throw new RuntimeException(e);
        }

        GetRequest request =
GetRequest.newBuilder().setReturningIdsOnly(true).build();
        Index index = getIndex();
        GetResponse<Document> rangeResponse = index.getRange(request);

        documentIterator = rangeResponse.iterator();

        return advance();
    }

    private Index getIndex() {
        IndexSpec remoteIndex =
IndexSpec.newBuilder().setName("remoteIndex").build();
        return
SearchServiceFactory.getSearchService().getIndex(remoteIndex);
    }

    @Override
    public boolean advance() throws IOException {
        if (documentIterator.hasNext()) {
            Document partialDoc = documentIterator.next();
            current = getIndex().get(partialDoc.getId());
            return true;
        }
        return false;
    }

    @Override
    public Document getCurrent() throws NoSuchElementException {
        return current;
    }

    @Override
    public void close() throws IOException {
        documentIterator = null;
        if (cleanupApiInstallation) {
            installer.uninstall();
        }
    }

    @Override
    public BoundedSource<Document> getCurrentSource() {
        return source;
    }
}

class SearchIndexSource extends BoundedSource<Document> {

    @Override
    public List<? extends BoundedSource<Document>> splitIntoBundles(long l,
PipelineOptions pipelineOptions) throws Exception {
        return Collections.singletonList(this);
    }

    @Override
    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions)
throws Exception {
        throw new NotImplementedException();
    }

    @Override
    public boolean producesSortedKeys(PipelineOptions pipelineOptions)
throws Exception {
        return false;
    }

    @Override
    public BoundedReader<Document> createReader(PipelineOptions
pipelineOptions) throws IOException {
        return new SearchIndexReader(this);
    }

    @Override
    public void validate() {

    }

    @Override
    public Coder<Document> getDefaultOutputCoder() {
        return DocumentCoder.of();
    }
}

Reply via email to