在pyflink自定义UDF加载Keras模型并注册UDF时、报错:TypeError: can't pickle _thread.lock objects、有大佬遇到过吗?谢谢!(插入的图不知看不看的到)
class myKerasMLP(ScalarFunction): def __init__(self): ... def open(self, function_context): ... def eval(self, x, y): ... def load_model(self): """ 加载模型,如果 redis 里存在模型,则优先从 redis 加载,否则初始化一个新模型 :return: """ import redis import pickle import logging logging.info('载入模型!') r = redis.StrictRedis(**self.redis_params) model = None try: # redis加载model json model = model_from_json(r.get(self.model_name)) # redis加载model权重 weights = pickle.loads(r.get(self.weights)) # # 设置权重 model.set_weights(weights) model.summary() except TypeError: logging.info('Redis 内没有指定名称的模型,因此初始化一个新模型') except (redis.exceptions.RedisError, TypeError, Exception): logging.warning('Redis 出现异常,因此初始化一个新模型') finally: print("MLP model", model) return model myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()], result_type=DataTypes.FLOAT()) print('UDF 模型加载完成!') t_env.create_temporary_system_function('train_and_predict', myKerasMLP) print('UDF 注册成功') ----------- _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= dense_1 (Dense) (None, 8) 72 _________________________________________________________________ dense_2 (Dense) (None, 10) 90 _________________________________________________________________ dense_3 (Dense) (None, 1) 11 ================================================================= Total params: 173 Trainable params: 173 Non-trainable params: 0 _________________________________________________________________ MLP model <keras.models.Sequential object at 0x0000000031C386A0> UDF 模型加载完成! <http://apache-flink.147419.n8.nabble.com/file/t1280/2.jpg> <http://apache-flink.147419.n8.nabble.com/file/t1280/11.jpg> -- Sent from: http://apache-flink.147419.n8.nabble.com/