Hi! I have a streaming (unbound) pipeline in which i like to use a sklearn( http://scikit-learn.github.io/stable) model for making predictions based upon input from a pubsub stream. In order to avoid loading the model multiple times I would like to use the DoFn.Setup() feature that where implemented from 2.14(at least as far as I understand), PR[562]( https://github.com/apache/beam/pull/7994).
My DoFn class PredictSklearn(beam.DoFn): """ Format the input to the desired shape""" def __init__(self): self._model = None def setup(self): model_name = "model.joblib" download_blob(bucket_name="dataflowsklearnstreaming",source_blob_name =model_name) self._model = joblib.load(model_name) def process(self,element): element["prediction"] = self._model.predict(element["data"]) return [element] My pipeline: input_pubsub = ( p | 'Read from PubSub 2' >> beam.io.gcp.pubsub.ReadFromPubSub(topic=known_args.topic,with_attributes= True)) _ = (input_pubsub | "format the data correctly" >> beam.ParDo(FormatInput()) | "transform the data" >> beam.ParDo(PredictSklearn()) | "print the data" >> beam.Map(printy) ) However i get the error message:""" AttributeError: 'NoneType' object has no attribute 'predict' [while running 'transform the data']""" Due to that the model is not loaded. For the full code: https://github.com/NikeNano/DataflowSklearnStreaming I have made an issus in Jira but realised that I probably should have asked here first. Would be super happy for some help on this :) Thanks!