在pyflink自定义UDF加载Keras模型并注册UDF时、报错:TypeError: can't pickle _thread.lock
objects、有大佬遇到过吗?谢谢!(插入的图不知看不看的到)

class myKerasMLP(ScalarFunction):
    def __init__(self):
        ...

    def open(self, function_context):
        ...

    def eval(self, x, y):
        ...

    def load_model(self):
        """
        加载模型,如果 redis 里存在模型,则优先从 redis 加载,否则初始化一个新模型
        :return:
        """
        import redis
        import pickle
        import logging

        logging.info('载入模型!')
        r = redis.StrictRedis(**self.redis_params)
        model = None

        try:
            # redis加载model json
            model = model_from_json(r.get(self.model_name))
            # redis加载model权重
            weights = pickle.loads(r.get(self.weights))
            # # 设置权重
            model.set_weights(weights)
            model.summary()
        except TypeError:
            logging.info('Redis 内没有指定名称的模型,因此初始化一个新模型')
        except (redis.exceptions.RedisError, TypeError, Exception):
            logging.warning('Redis 出现异常,因此初始化一个新模型')
        finally:
            print("MLP model", model)
        return model

myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.FLOAT(),
DataTypes.FLOAT()],
                 result_type=DataTypes.FLOAT())
print('UDF 模型加载完成!')
t_env.create_temporary_system_function('train_and_predict', myKerasMLP)
print('UDF 注册成功')
-----------
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
dense_1 (Dense)              (None, 8)                 72        
_________________________________________________________________
dense_2 (Dense)              (None, 10)                90        
_________________________________________________________________
dense_3 (Dense)              (None, 1)                 11        
=================================================================
Total params: 173
Trainable params: 173
Non-trainable params: 0
_________________________________________________________________
MLP model <keras.models.Sequential object at 0x0000000031C386A0>
UDF 模型加载完成!


<http://apache-flink.147419.n8.nabble.com/file/t1280/2.jpg> 
<http://apache-flink.147419.n8.nabble.com/file/t1280/11.jpg> 




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复