Replied on the JIRA issue[1] you filed. Let's keep the conversation in one place in the JIRA.
[1] https://issues.apache.org/jira/browse/BEAM-7885 Ahmet On Wed, Aug 7, 2019 at 1:19 PM Niklas Hansson <[email protected]> wrote: > Hi! > > I have a streaming (unbound) pipeline in which i like to use a sklearn( > http://scikit-learn.github.io/stable) model for making predictions based > upon input from a pubsub stream. In order to avoid loading the model > multiple times I would like to use the DoFn.Setup() feature that where > implemented from 2.14(at least as far as I understand), PR[562]( > https://github.com/apache/beam/pull/7994). > > My DoFn > class PredictSklearn(beam.DoFn): > """ Format the input to the desired shape""" > > def __init__(self): > self._model = None > > def setup(self): > model_name = "model.joblib" > download_blob(bucket_name="dataflowsklearnstreaming",source_blob_name > =model_name) > self._model = joblib.load(model_name) > > def process(self,element): > element["prediction"] = self._model.predict(element["data"]) > return [element] > > My pipeline: > input_pubsub = ( p | 'Read from PubSub 2' >> > beam.io.gcp.pubsub.ReadFromPubSub(topic=known_args.topic,with_attributes= > True)) > _ = (input_pubsub | "format the data correctly" >> > beam.ParDo(FormatInput()) > | "transform the data" >> beam.ParDo(PredictSklearn()) > | "print the data" >> beam.Map(printy) > ) > > However i get the error message:""" AttributeError: 'NoneType' object has > no attribute 'predict' [while running 'transform the data']""" Due to > that the model is not loaded. > > For the full code: https://github.com/NikeNano/DataflowSklearnStreaming > > I have made an issus in Jira but realised that I probably should have > asked here first. Would be super happy for some help on this :) > > Thanks! > > >
