Hi!

I have a streaming (unbound) pipeline in which i like to use a sklearn(
http://scikit-learn.github.io/stable) model for making predictions based
upon input from a pubsub stream. In order to avoid loading the model
multiple times I would like to use the DoFn.Setup() feature that where
implemented from 2.14(at least as far as I understand), PR[562](
https://github.com/apache/beam/pull/7994).

My DoFn
class PredictSklearn(beam.DoFn):
""" Format the input to the desired shape"""

def __init__(self):
self._model = None

def setup(self):
model_name = "model.joblib"
download_blob(bucket_name="dataflowsklearnstreaming",source_blob_name
=model_name)
self._model = joblib.load(model_name)

def process(self,element):
element["prediction"] = self._model.predict(element["data"])
return [element]

My pipeline:
input_pubsub = ( p | 'Read from PubSub 2' >>
beam.io.gcp.pubsub.ReadFromPubSub(topic=known_args.topic,with_attributes=
True))
_ = (input_pubsub | "format the data correctly" >> beam.ParDo(FormatInput())
| "transform the data" >> beam.ParDo(PredictSklearn())
| "print the data" >> beam.Map(printy)
)

However i get the error message:""" AttributeError: 'NoneType' object has
no attribute 'predict' [while running 'transform the data']""" Due to that
the model is not loaded.

For the full code: https://github.com/NikeNano/DataflowSklearnStreaming

I have made an issus in Jira but realised that I probably should have asked
here first. Would be super happy for some help on this :)

Thanks!

Reply via email to