[ https://issues.apache.org/jira/browse/FLINK-22728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348967#comment-17348967 ]
JYXL edited comment on FLINK-22728 at 6/10/21, 4:24 AM: -------------------------------------------------------- hi Dian: I have used the method that note in the url, and create a test project as follow: |project: |---- __init__.py |---- filter.py |---- main_task.py |---- target.py and the code snippet as follow: filter.py {code:python} from pyflink.datastream.functions import FilterFunction # from project.target import Target --1 class MyFilterFunction(FilterFunction): # def open(self, runtime_context): # from project.target import Target --2 def filter(self, value): from project.target import Target #--3 return value[-1] == Target.TRUE {code} target.py {code:python} import enum class Target(enum.Enum): TRUE = '1' FALSE = '0' {code} main_task.py {code:python} env = StreamExecutionEnvironment.get_execution_environment() # it gets the project parent dir. env.add_python_file( os.path.abspath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))) env.set_python_executable(sys.executable) kafka_consumer = KafkaConsumer() source_ds = \ env.add_source(source_func=kafka_consumer, source_name="kafka source", type_info=kafka_consumer.get_type_info()) source_ds.print('source: ') filter_ds = source_ds.filter(MyFilterFunction()) filter_ds.print('filter: ') env.execute('test') {code} when I place `import ` at '1' or at '3', the console output as follow: source: :8> +I[0, pyflink, 1, 2, 1621567628127, 1621567628127, 1, 1] source: :8> +I[1, pyflink, 1, 2, 1621567628132, 1621567628132, 2, 1] source: :8> +I[2, pyflink, 1, 2, 1621567628132, 1621567628132, 3, 1] when I place `import ` at '2', the console get some error as follow: NameError: name 'Target' is not defined then I changed fliter.py as follow: {code:python} from pyflink.datastream.functions import FilterFunction class MyFilterFunction(FilterFunction): def filter(self, value): return value[-1] == "1" {code} the console output as follow: source: :8> +I[0, pyflink, 1, 2, 1621572916880, 1621572916880, 1, 1] source: :8> +I[1, pyflink, 1, 2, 1621572916885, 1621572916885, 2, 1] source: :8> +I[2, pyflink, 1, 2, 1621572916885, 1621572916885, 3, 1] filter: :8> +I[0, pyflink, 1, 2, 1621572916880, 1621572916880, 1, 1] filter: :8> +I[1, pyflink, 1, 2, 1621572916885, 1621572916885, 2, 1] filter: :8> +I[2, pyflink, 1, 2, 1621572916885, 1621572916885, 3, 1] that's all my need. was (Author: jyxl_1): hi Dian: I have used the method that note in the url, and create a test project as follow: project: |-- __init__.py |-- filter.py |-- main_task.py |-- target.py and the code snippet as follow: filter.py {code:python} from pyflink.datastream.functions import FilterFunction # from project.target import Target --1 class MyFilterFunction(FilterFunction): # def open(self, runtime_context): # from project.target import Target --2 def filter(self, value): from project.target import Target #--3 return value[-1] == Target.TRUE {code} target.py {code:python} import enum class Target(enum.Enum): TRUE = '1' FALSE = '0' {code} main_task.py {code:python} env = StreamExecutionEnvironment.get_execution_environment() # it gets the project parent dir. env.add_python_file( os.path.abspath(os.path.abspath(os.path.dirname(os.path.dirname(__file__))))) env.set_python_executable(sys.executable) kafka_consumer = KafkaConsumer() source_ds = \ env.add_source(source_func=kafka_consumer, source_name="kafka source", type_info=kafka_consumer.get_type_info()) source_ds.print('source: ') filter_ds = source_ds.filter(MyFilterFunction()) filter_ds.print('filter: ') env.execute('test') {code} when I place `import ` at '1' or at '3', the console output as follow: source: :8> +I[0, pyflink, 1, 2, 1621567628127, 1621567628127, 1, 1] source: :8> +I[1, pyflink, 1, 2, 1621567628132, 1621567628132, 2, 1] source: :8> +I[2, pyflink, 1, 2, 1621567628132, 1621567628132, 3, 1] when I place `import ` at '2', the console get some error as follow: NameError: name 'Target' is not defined then I changed fliter.py as follow: {code:python} from pyflink.datastream.functions import FilterFunction class MyFilterFunction(FilterFunction): def filter(self, value): return value[-1] == "1" {code} the console output as follow: source: :8> +I[0, pyflink, 1, 2, 1621572916880, 1621572916880, 1, 1] source: :8> +I[1, pyflink, 1, 2, 1621572916885, 1621572916885, 2, 1] source: :8> +I[2, pyflink, 1, 2, 1621572916885, 1621572916885, 3, 1] filter: :8> +I[0, pyflink, 1, 2, 1621572916880, 1621572916880, 1, 1] filter: :8> +I[1, pyflink, 1, 2, 1621572916885, 1621572916885, 2, 1] filter: :8> +I[2, pyflink, 1, 2, 1621572916885, 1621572916885, 3, 1] that's all my need. > a problem of loading udf > ------------------------ > > Key: FLINK-22728 > URL: https://issues.apache.org/jira/browse/FLINK-22728 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.13.0 > Environment: python3.7 > centos 8 > pyflink1.13.0 > java1.11 > Reporter: JYXL > Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > hi: > I'm using the stream udf by python. > udf as bellow: > {code:python} > class MyKeySelector(KeySelector): > def __init__(self, partitions: int=6): > self.partitions = partitions > def get_key(self, value): > return random.randint(0, self.partitions) > {code} > > when I code it with the main task in the same script, it works, > but when I make it in a simgle script, it cannot work. > the archives as bellow: > > project: > | __init__.py > | key_function.py > | main_task.py > > I'm confused when I use env.add_python_file method, it cannot work either, > no matter the parameter `file_path` is '~/project' or > '~/project/key_function.py. -- This message was sent by Atlassian Jira (v8.3.4#803005)