[jira] [Created] (FLINK-35208) Respect pipeline.cached-files during processing Python dependencies
Dian Fu created FLINK-35208: --- Summary: Respect pipeline.cached-files during processing Python dependencies Key: FLINK-35208 URL: https://issues.apache.org/jira/browse/FLINK-35208 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Currently, PyFlink will make use of distributed cache (update StreamExecutionEnvironment#cachedFiles) during handling the Python dependencies(See [https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java#L339] for more details). However, if pipeline.cached-files is configured, it will clear StreamExecutionEnvironment#cachedFiles(see [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1132] for more details) which may break the above functionalities. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34985) It doesn't support to access fields by name for map function in thread mode
Dian Fu created FLINK-34985: --- Summary: It doesn't support to access fields by name for map function in thread mode Key: FLINK-34985 URL: https://issues.apache.org/jira/browse/FLINK-34985 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Reported in slack channel: [https://apache-flink.slack.com/archives/C065944F9M2/p1711640068929589] ``` hi all, I seem to be running into an issue when switching to thread mode in PyFlink. In an UDF the {{Row}} seems to get converted into a tuple and you cannot access fields by their name anymore. In process mode it works fine. This bug can easily be reproduced using this minimal example, which is close to the PyFlink docs: from pyflink.datastream import StreamExecutionEnvironment from pyflink.common import Row from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.get_config().set("parallelism.default", "1") # This does work: t_env.get_config().set("python.execution-mode", "process") # This doesn't work: #t_env.get_config().set("python.execution-mode", "thread") def map_function(a: Row) -> Row: return Row(a.a + 1, a.b * a.b) # map operation with a python general scalar function func = udf( map_function, result_type=DataTypes.ROW( [ DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT()), ] ), ) table = ( t_env.from_elements( [(2, 4), (0, 0)], schema=DataTypes.ROW( [ DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT()), ] ), ) .map(func) .alias("a", "b") .execute() .print() )``` The exception I get in this execution mode is: 2024-03-28 16:32:10 Caused by: pemja.core.PythonException: : 'tuple' object has no attribute 'a' 2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72) 2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102) 2024-03-28 16:32:10 at .(:1) 2024-03-28 16:32:10 at /opt/flink/wouter/minimal_example.map_function(minimal_example.py:19) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32989) PyFlink wheel package build failed
Dian Fu created FLINK-32989: --- Summary: PyFlink wheel package build failed Key: FLINK-32989 URL: https://issues.apache.org/jira/browse/FLINK-32989 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.19.0 Reporter: Dian Fu {code} Compiling pyflink/fn_execution/coder_impl_fast.pyx because it changed. Compiling pyflink/fn_execution/table/aggregate_fast.pyx because it changed. Compiling pyflink/fn_execution/table/window_aggregate_fast.pyx because it changed. Compiling pyflink/fn_execution/stream_fast.pyx because it changed. Compiling pyflink/fn_execution/beam/beam_stream_fast.pyx because it changed. Compiling pyflink/fn_execution/beam/beam_coder_impl_fast.pyx because it changed. Compiling pyflink/fn_execution/beam/beam_operations_fast.pyx because it changed. [1/7] Cythonizing pyflink/fn_execution/beam/beam_coder_impl_fast.pyx [2/7] Cythonizing pyflink/fn_execution/beam/beam_operations_fast.pyx [3/7] Cythonizing pyflink/fn_execution/beam/beam_stream_fast.pyx [4/7] Cythonizing pyflink/fn_execution/coder_impl_fast.pyx [5/7] Cythonizing pyflink/fn_execution/stream_fast.pyx [6/7] Cythonizing pyflink/fn_execution/table/aggregate_fast.pyx [7/7] Cythonizing pyflink/fn_execution/table/window_aggregate_fast.pyx /home/vsts/work/1/s/flink-python/dev/.conda/envs/3.7/lib/python3.7/site-packages/Cython/Compiler/Main.py:369: FutureWarning: Cython directive 'language_level' not set, using 2 for now (Py2). This will change in a later release! File: /home/vsts/work/1/s/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pxd tree = Parsing.p_module(s, pxd, full_module_name) Exactly one Flink home directory must exist, but found: [] {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=52740&view=logs&j=d15e2b2e-10cd-5f59-7734-42d57dc5564d&t=4a86776f-e6e1-598a-f75a-c43d8b819662 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32724) Mark CEP API classes as Public / PublicEvolving
Dian Fu created FLINK-32724: --- Summary: Mark CEP API classes as Public / PublicEvolving Key: FLINK-32724 URL: https://issues.apache.org/jira/browse/FLINK-32724 Project: Flink Issue Type: Improvement Components: Library / CEP Reporter: Dian Fu Currently most CEP API classes, e.g. Pattern, PatternSelectFunction etc are not annotated as Public / PublicEvolving. We should improve this to make it clear which classes are public. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32131) Support specifying hadoop config dir for Python HiveCatalog
Dian Fu created FLINK-32131: --- Summary: Support specifying hadoop config dir for Python HiveCatalog Key: FLINK-32131 URL: https://issues.apache.org/jira/browse/FLINK-32131 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.18.0 Hadoop config directory could be specified for HiveCatalog in Java, however, this is still not supported in Python HiveCatalog. This JIRA is to align them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31949) The state of CountTumblingWindowAssigner of Python DataStream API were never purged
Dian Fu created FLINK-31949: --- Summary: The state of CountTumblingWindowAssigner of Python DataStream API were never purged Key: FLINK-31949 URL: https://issues.apache.org/jira/browse/FLINK-31949 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Posted By Urs from user-mailing list: {code:java} "In FLINK-26444, a couple of convenience window assigners were added to the Python Datastream API, including CountTumblingWindowAssigner. This assigner uses a CountTrigger by default, which produces TriggerResult.FIRE. As such, using this window assigner on a data stream will always produce a "state leak" since older count windows will always be retained without any chance to work on the elements again." {code} See [https://lists.apache.org/thread/ql8x283xzgd98z0vsqr9npl5j74hscsm] for more details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31905) Exception thrown when accessing nested field of the result of Python UDF with complex type
Dian Fu created FLINK-31905: --- Summary: Exception thrown when accessing nested field of the result of Python UDF with complex type Key: FLINK-31905 URL: https://issues.apache.org/jira/browse/FLINK-31905 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu For the following job: {code} import logging, sys from pyflink.common import Row from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import Schema, DataTypes, TableDescriptor, StreamTableEnvironment from pyflink.table.expressions import col, row from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf logging.basicConfig(stream=sys.stdout, level=logging.ERROR, format="%(message)s") class EmitLastState(AggregateFunction): """ Aggregator that emits the latest state for the purpose of enabling parallelism on CDC tables. """ def create_accumulator(self) -> ACC: return Row(None, None) def accumulate(self, accumulator: ACC, *args): key, obj = args if (accumulator[0] is None) or (key > accumulator[0]): accumulator[0] = key accumulator[1] = obj def retract(self, accumulator: ACC, *args): pass def get_value(self, accumulator: ACC) -> T: return accumulator[1] some_complex_inner_type = DataTypes.ROW( [ DataTypes.FIELD("f0", DataTypes.STRING()), DataTypes.FIELD("f1", DataTypes.STRING()) ] ) some_complex_type = DataTypes.ROW( [ DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type)) for k in ("f0", "f1", "f2") ] + [ DataTypes.FIELD("f3", DataTypes.DATE()), DataTypes.FIELD("f4", DataTypes.VARCHAR(32)), DataTypes.FIELD("f5", DataTypes.VARCHAR(2)), ] ) @udf(input_types=DataTypes.STRING(), result_type=some_complex_type) def complex_udf(s): return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None) if __name__ == "__main__": env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) table_env.get_config().set('pipeline.classpaths', 'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar') # Create schema _schema = { "p_key": DataTypes.INT(False), "modified_id": DataTypes.INT(False), "content": DataTypes.STRING() } schema = Schema.new_builder().from_fields( *zip(*[(k, v) for k, v in _schema.items()]) ).\ primary_key("p_key").\ build() # Create table descriptor descriptor = TableDescriptor.for_connector("postgres-cdc").\ option("hostname", "host.docker.internal").\ option("port", "5432").\ option("database-name", "flink_issue").\ option("username", "root").\ option("password", "root").\ option("debezium.plugin.name", "pgoutput").\ option("schema-name", "flink_schema").\ option("table-name", "flink_table").\ option("slot.name", "flink_slot").\ schema(schema).\ build() table_env.create_temporary_table("flink_table", descriptor) # Create changelog stream stream = table_env.from_path("flink_table")\ # Define UDAF accumulator_type = DataTypes.ROW( [ DataTypes.FIELD("f0", DataTypes.INT(False)), DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in _schema.items()])), ] ) result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in _schema.items()]) emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type, result_type=result_type) # Emit last state based on modified_id to enable parallel processing stream = stream.\ group_by(col("p_key")).\ select( col("p_key"), emit_last(col("modified_id"),row(*(col(k) for k in _schema.keys())).cast(result_type)).alias("tmp_obj") ) # Select the elements of the objects stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in _schema.keys())) # We apply a UDF which parses the xml and returns a complex nested structure stream = stream.select(col("p_key"), complex_udf(col("content")).alias("nested_obj")) # We select an element from the nested structure in order to flatten it # The next line is the line causing issues, commenting the next line will make the pipeline work stream = stream.select(col("p_key"), col("nested_obj").get("f0")) # Interestingly, the below part does work... # stream = stream.select(col("nested_obj").get("f0")) table_env.to_changelog_stream(stream).print() # Execute env.execute_async() {code} {code} py4j.protocol.Py4JJavaError: An error occurred while calling o8.toChangelogStream. : java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1 at
[jira] [Created] (FLINK-31861) Introduce ByteArraySchema which serialize/deserialize data of type byte array
Dian Fu created FLINK-31861: --- Summary: Introduce ByteArraySchema which serialize/deserialize data of type byte array Key: FLINK-31861 URL: https://issues.apache.org/jira/browse/FLINK-31861 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Dian Fu The aim of this ticket is to introduce ByteArraySchema which serialize/deserialize data of type byte array. In this case, users could get the raw bytes from a data source. See [https://apache-flink.slack.com/archives/C03G7LJTS2G/p1681928862762699] for more details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31707) Constant string cannot be used as input arguments of Pandas UDAF
Dian Fu created FLINK-31707: --- Summary: Constant string cannot be used as input arguments of Pandas UDAF Key: FLINK-31707 URL: https://issues.apache.org/jira/browse/FLINK-31707 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Assignee: Dian Fu It will throw exceptions as following when using constant strings in Pandas UDAF: {code} E raise ValueError("field_type %s is not supported." % field_type) E ValueError: field_type type_name: CHAR E char_info { E length: 3 E } Eis not supported. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31690) The current key is not set for KeyedCoProcessOperator
Dian Fu created FLINK-31690: --- Summary: The current key is not set for KeyedCoProcessOperator Key: FLINK-31690 URL: https://issues.apache.org/jira/browse/FLINK-31690 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Assignee: Dian Fu See https://apache-flink.slack.com/archives/C03G7LJTS2G/p1680294701254239 for more details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31532) Support StreamExecutionEnvironment.socketTextStream in Python DataStream API
Dian Fu created FLINK-31532: --- Summary: Support StreamExecutionEnvironment.socketTextStream in Python DataStream API Key: FLINK-31532 URL: https://issues.apache.org/jira/browse/FLINK-31532 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Currently, StreamExecutionEnvironment.socketTextStream is still missing in Python DataStream API. It would be great to support it. It may be helpful to in special cases, e.g. testing, etc. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31503) "org.apache.beam.sdk.options.PipelineOptionsRegistrar: Provider org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar not a subtype" is thrown when executing Pytho
Dian Fu created FLINK-31503: --- Summary: "org.apache.beam.sdk.options.PipelineOptionsRegistrar: Provider org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar not a subtype" is thrown when executing Python UDFs in SQL Client Key: FLINK-31503 URL: https://issues.apache.org/jira/browse/FLINK-31503 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Assignee: Dian Fu The following exception will be thrown when executing SQL statements containing Python UDFs in SQL Client: {code} Caused by: java.util.ServiceConfigurationError: org.apache.beam.sdk.options.PipelineOptionsRegistrar: Provider org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar not a subtype at java.util.ServiceLoader.fail(ServiceLoader.java:239) at java.util.ServiceLoader.access$300(ServiceLoader.java:185) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableCollection$Builder.addAll(ImmutableCollection.java:415) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet$Builder.addAll(ImmutableSet.java:507) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSortedSet$Builder.addAll(ImmutableSortedSet.java:528) at org.apache.beam.sdk.util.common.ReflectHelpers.loadServicesOrdered(ReflectHelpers.java:199) at org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.initializeRegistry(PipelineOptionsFactory.java:2089) at org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.(PipelineOptionsFactory.java:2083) at org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.(PipelineOptionsFactory.java:2047) at org.apache.beam.sdk.options.PipelineOptionsFactory.resetCache(PipelineOptionsFactory.java:581) at org.apache.beam.sdk.options.PipelineOptionsFactory.(PipelineOptionsFactory.java:547) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:241) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31478) TypeError: a bytes-like object is required, not 'JavaList' is thrown when ds.execute_and_collect() is called on a KeyedStream
Dian Fu created FLINK-31478: --- Summary: TypeError: a bytes-like object is required, not 'JavaList' is thrown when ds.execute_and_collect() is called on a KeyedStream Key: FLINK-31478 URL: https://issues.apache.org/jira/browse/FLINK-31478 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Assignee: Dian Fu {code} # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import logging import sys from pyflink.common import WatermarkStrategy, Encoder, Types from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode from pyflink.datastream.connectors.file_system import (FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy) word_count_data = ["To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--'tis a consummation", "Devoutly to be wish'd. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there's the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there's the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor's wrong, the proud man's contumely,", "The pangs of despis'd love, the law's delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover'd country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember'd."] def word_count(input_path, output_path): env = StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.BATCH) # write all the data to one file env.set_parallelism(1) # define the source if input_path is not None: ds = env.from_source( source=FileSource.for_record_stream_format(StreamFormat.text_line_format(), input_path) .process_static_file_set().build(), watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name="file_source" ) else: print("Executing word_count example with default input data set.") print("Use --input to specify file input.") ds = env.from_collection(word_count_data) def split(line): yield from line.split() # compute
[jira] [Created] (FLINK-31286) Python processes are still alive when shutting down a session cluster directly without stopping the jobs
Dian Fu created FLINK-31286: --- Summary: Python processes are still alive when shutting down a session cluster directly without stopping the jobs Key: FLINK-31286 URL: https://issues.apache.org/jira/browse/FLINK-31286 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Attachments: image-2023-03-02-10-55-34-863.png Reproduce steps: 1) start a standalone cluster: ./bin/start_cluster.sh 2) submit a PyFlink job which contains Python UDFs 3) stop the cluster: ./bin/stop_cluster.sh 4) Check if Python process still exists: ps aux | grep -i beam_boot !image-2023-03-02-10-55-34-863.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31272) Duplicate operators appear in the StreamGraph for Python DataStream API jobs
Dian Fu created FLINK-31272: --- Summary: Duplicate operators appear in the StreamGraph for Python DataStream API jobs Key: FLINK-31272 URL: https://issues.apache.org/jira/browse/FLINK-31272 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.16.0 Reporter: Dian Fu For the following job: {code} import argparse import json import sys import time from typing import Iterable, cast from pyflink.common import Types, Time, Encoder from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction, EmbeddedRocksDBStateBackend, \ PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode, ExternalizedCheckpointCleanup from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy, OutputFileConfig from pyflink.datastream.state import ReducingState, ReducingStateDescriptor from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T, TumblingProcessingTimeWindows, \ ProcessingTimeTrigger class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger): def __init__(self, window_size: int): self._window_size = window_size self._count_state_descriptor = ReducingStateDescriptor( "count", lambda a, b: a + b, Types.LONG()) @staticmethod def of(window_size: int) -> 'CountWithProcessTimeoutTrigger': return CountWithProcessTimeoutTrigger(window_size) def on_element(self, element: T, timestamp: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: count_state = cast(ReducingState, ctx.get_partitioned_state(self._count_state_descriptor)) count_state.add(1) # print("element arrive:", element, "count_state:", count_state.get(), window.max_timestamp(), # ctx.get_current_watermark()) if count_state.get() >= self._window_size: # 必须fire&purge print("fire element count", element, count_state.get(), window.max_timestamp(), ctx.get_current_watermark()) count_state.clear() return TriggerResult.FIRE_AND_PURGE if timestamp >= window.end: count_state.clear() return TriggerResult.FIRE_AND_PURGE else: return TriggerResult.CONTINUE def on_processing_time(self, timestamp: int, window: TimeWindow, ctx: Trigger.TriggerContext) -> TriggerResult: if timestamp >= window.end: return TriggerResult.CONTINUE else: print("fire with process_time:", timestamp) count_state = cast(ReducingState, ctx.get_partitioned_state(self._count_state_descriptor)) count_state.clear() return TriggerResult.FIRE_AND_PURGE def on_event_time(self, timestamp: int, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> TriggerResult: return TriggerResult.CONTINUE def clear(self, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> None: count_state = ctx.get_partitioned_state(self._count_state_descriptor) count_state.clear() def to_dict_map(v): time.sleep(1) dict_value = json.loads(v) return dict_value def get_group_key(value, keys): group_key_values = [] for key in keys: one_key_value = 'null' if key in value: list_value = value[key] if list_value: one_key_value = str(list_value[0]) group_key_values.append(one_key_value) group_key = '_'.join(group_key_values) # print("group_key=", group_key) return group_key class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str, TimeWindow]): def __init__(self, uf): self._user_function = uf def process(self, key: str, context: ProcessWindowFunction.Context[TimeWindow], elements: Iterable[dict]) -> Iterable[dict]: result_list = self._user_function.process_after_group_by_function(elements) return result_list if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( '--output', dest='output', required=False, help='Output file to write results to.') argv = sys.argv[1:] known_args, _ = parser.parse_known_args(argv) output_path = known_args.output env = StreamExecutionEnvironment.get_execution_environment() # write all the data to one file env.set_parallelism(1) # process time env.get_config().set_auto_watermark_interval(0) state_backend = EmbeddedRocksDBStateBackend(True) state_backend.set_predefined_options(PredefinedOptions.FLASH_SSD_OP
[jira] [Created] (FLINK-31172) Support coGroup in Python DataStream API
Dian Fu created FLINK-31172: --- Summary: Support coGroup in Python DataStream API Key: FLINK-31172 URL: https://issues.apache.org/jira/browse/FLINK-31172 Project: Flink Issue Type: New Feature Components: API / Python Reporter: Dian Fu The aim of this ticket is support [coGroup|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/overview/#window-cogroup] in Python DataStream API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31171) Support iterate in Python DataStream API
Dian Fu created FLINK-31171: --- Summary: Support iterate in Python DataStream API Key: FLINK-31171 URL: https://issues.apache.org/jira/browse/FLINK-31171 Project: Flink Issue Type: New Feature Components: API / Python Reporter: Dian Fu The aim of this ticket is to support [iterate|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/overview/#iterate] in Python DataStream API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31043) KeyError exception is thrown in CachedMapState
Dian Fu created FLINK-31043: --- Summary: KeyError exception is thrown in CachedMapState Key: FLINK-31043 URL: https://issues.apache.org/jira/browse/FLINK-31043 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.15.3 Reporter: Dian Fu Have seen the following exception in a PyFlink job which runs in Flink 1.15. It happens occasionally and may indicate a bug of the state cache of MapState: {code:java} org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1875) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1846) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:2010) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1999) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1079) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1028) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) at java.lang.Thread.run(Thread.java:834) Caused by: TimerException{java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush} ... 14 more Caused by: java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:2008) ... 13 more Caused by: java.lang.RuntimeException: Failed to close remote bundle at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:387) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:371) at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) ... 1 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 131: Traceback (most recent call last): File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle ele
[jira] [Created] (FLINK-31031) Disable the output buffer of Python process to make it more convenient for interactive users
Dian Fu created FLINK-31031: --- Summary: Disable the output buffer of Python process to make it more convenient for interactive users Key: FLINK-31031 URL: https://issues.apache.org/jira/browse/FLINK-31031 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30364) Job failed without exception stack
Dian Fu created FLINK-30364: --- Summary: Job failed without exception stack Key: FLINK-30364 URL: https://issues.apache.org/jira/browse/FLINK-30364 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.16.0, 1.15.0, 1.14.0, 1.13.0 Reporter: Dian Fu {code} 2022-12-12 08:01:19,985 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - route map function -> Filter -> Timestamps/Watermarks with job vertex id 4bf7c1955ffe56e2106d666433eaf137 (3/30) (96f1efb2ad67ab19cae4d81548ae58e9) switched from RUNNING to FAILED on job-ee140d6c-483b-48c3-961d-b02981d4ba99-taskmanager-1-1. java.lang.NullPointerException: null {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30308) ClassCastException: class java.io.ObjectStreamClass$Caches$1 cannot be cast to class java.util.Map is showing in the logging when the job shutdown
Dian Fu created FLINK-30308: --- Summary: ClassCastException: class java.io.ObjectStreamClass$Caches$1 cannot be cast to class java.util.Map is showing in the logging when the job shutdown Key: FLINK-30308 URL: https://issues.apache.org/jira/browse/FLINK-30308 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.17.0, 1.16.1, 1.15.4 {code:java} 2022-12-05 18:26:40,229 WARN org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Failed to clean up the leaking objects. java.lang.ClassCastException: class java.io.ObjectStreamClass$Caches$1 cannot be cast to class java.util.Map (java.io.ObjectStreamClass$Caches$1 and java.util.Map are in module java.base of loader 'bootstrap') at org.apache.flink.streaming.api.utils.ClassLeakCleaner.clearCache(ClassLeakCleaner.java:58) ~[blob_p-a72e14b9030c3ca0d3d0a8fc6e70166c7419d431-f7f18b2164971cb6798db9ab762feabd:1.15.0] at org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses(ClassLeakCleaner.java:39) ~[blob_p-a72e14b9030c3ca0d3d0a8fc6e70166c7419d431-f7f18b2164971cb6798db9ab762feabd:1.15.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.close(AbstractPythonFunctionOperator.java:142) ~[blob_p-a72e14b9030c3ca0d3d0a8fc6e70166c7419d431-f7f18b2164971cb6798db9ab762feabd:1.15.0] at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.close(AbstractExternalPythonFunctionOperator.java:73) ~[blob_p-a72e14b9030c3ca0d3d0a8fc6e70166c7419d431-f7f18b2164971cb6798db9ab762feabd:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:916) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:930) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) [flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930) [flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) [flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) [flink-dist-1.15.2.jar:1.15.2] at java.lang.Thread.run(Unknown Source) [?:?]{code} Reported in Slack: https://apache-flink.slack.com/archives/C03G7LJTS2G/p1670265131083639?thread_ts=1670265114.640369&cid=C03G7LJTS2G -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29192) Add completeness test for built-in functions between Java and Python Table API
Dian Fu created FLINK-29192: --- Summary: Add completeness test for built-in functions between Java and Python Table API Key: FLINK-29192 URL: https://issues.apache.org/jira/browse/FLINK-29192 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28886) Support HybridSource in Python DataStream API
Dian Fu created FLINK-28886: --- Summary: Support HybridSource in Python DataStream API Key: FLINK-28886 URL: https://issues.apache.org/jira/browse/FLINK-28886 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28253) LocalDateTime is not supported in PyFlink
Dian Fu created FLINK-28253: --- Summary: LocalDateTime is not supported in PyFlink Key: FLINK-28253 URL: https://issues.apache.org/jira/browse/FLINK-28253 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.0, 1.14.0 Reporter: Dian Fu For the following job: {code} from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings from pyflink.table.table_environment import StreamTableEnvironment if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() settings = EnvironmentSettings.new_instance() \ .in_streaming_mode() \ .build() t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings) t_env.execute_sql(""" CREATE TABLE events ( `id` VARCHAR, `source` VARCHAR, `resources` VARCHAR, `time` TIMESTAMP(3), WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///path/to/input', 'format' = 'csv' ) """) events_stream_table = t_env.from_path('events') events_stream = t_env.to_data_stream(events_stream_table) # Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.SQL_TIMESTAMP()]) # now do some processing - let's filter by the type of event we get codebuild_stream = events_stream.filter( lambda event: event['source'] == 'aws.codebuild' ) codebuild_stream.print() env.execute() {code} It will reports the following exception: {code} Traceback (most recent call last): File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", line 47, in process() File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", line 39, in process lambda event: event['source'] == 'aws.codebuild' File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", line 432, in filter self._j_data_stream.getTransformation().getOutputType()) File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1070, in _from_java_type TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType( File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1042, in _from_java_type j_row_field_types] File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1041, in row_field_types = [_from_java_type(j_row_field_type) for j_row_field_type in File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1072, in _from_java_type raise TypeError("The java type info: %s is not supported in PyFlink currently." % j_type_info) TypeError: The java type info: LocalDateTime is not supported in PyFlink currently. {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28140) Improve the documentation by adding Python examples
Dian Fu created FLINK-28140: --- Summary: Improve the documentation by adding Python examples Key: FLINK-28140 URL: https://issues.apache.org/jira/browse/FLINK-28140 Project: Flink Issue Type: Improvement Components: API / Python, Documentation Reporter: Dian Fu There are still quite a few documentation only having Java/Scala examples. The aim of this JIRA is to improve these kinds of documentation by adding Python examples. Here is a list of documentations needed to improve: * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/timezone/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/modules/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/execution_configuration/ * https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/ * https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/ * https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/ * https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ * https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/ -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28114) The path of the Python client interpreter could not point to an archive file in distributed file system
Dian Fu created FLINK-28114: --- Summary: The path of the Python client interpreter could not point to an archive file in distributed file system Key: FLINK-28114 URL: https://issues.apache.org/jira/browse/FLINK-28114 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Fix For: 1.16.0, 1.15.1 See https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java#L178 for more details about this limitation. Users could execute PyFlink jobs in YARN application mode as following: {code} ./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=1024m \ -Dyarn.application.name= \ -Dyarn.ship-files=/path/to/shipfiles \ -pyarch shipfiles/venv.zip \ -pyclientexec venv.zip/venv/bin/python3 \ -pyexec venv.zip/venv/bin/python3 \ -py shipfiles/word_count.py {code} In the above case, venv.zip will be distributed to the TMs via Flink blob server. However, blob server doesn't support files with size exceeding of 2GB. See https://github.com/apache/flink/blob/ea52732dc48a4f1c5be0925890cd8aa1ea2a11ed/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java#L223 for more details. This is very serious problem as Python users usually tend to install a lot Python libraries inside the venv.zip and some Python libraries are very large. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28071) Support missing built-in functions in Table API
Dian Fu created FLINK-28071: --- Summary: Support missing built-in functions in Table API Key: FLINK-28071 URL: https://issues.apache.org/jira/browse/FLINK-28071 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Dian Fu Fix For: 1.16.0 There are many built-in functions are not supported. See https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/ for more details. There are two columns for each built-in function: *SQL Function* and *Table Function*, if a function is not supported in *Table API*, the *Table Function* column is documented as *N/A*. We need to evaluate each of these functions to ensure that they could be used in both SQL and Table API. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28015) FROM_UNIXTIME could not be used in Table API
Dian Fu created FLINK-28015: --- Summary: FROM_UNIXTIME could not be used in Table API Key: FLINK-28015 URL: https://issues.apache.org/jira/browse/FLINK-28015 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Dian Fu This issue is reported in [slack|https://apache-flink.slack.com/archives/C03G7LJTS2G/p1655083223954149]. For the following code: {code} input_table = table_env.from_path(input_table_name) sliding_window_table = ( input_table.window( Slide.over(sliding_window_over) .every(sliding_window_every) .on(sliding_window_on) .alias(sliding_window_alias) ) .group_by('ticker, {}'.format(sliding_window_alias)) .select('FROM_UNIXTIME(28*60 * (UNIX_TIMESTAMP({0}.end) / (28*60))), ticker, MIN(price) as min_price, MAX(price) as max_price, {0}.start as utc_start, {0}.end as utc_end'.format( sliding_window_alias )) ) {code} The following exception will be thrown: {code} py4j.protocol.Py4JJavaError: An error occurred while calling o75.select. : org.apache.flink.table.api.ValidationException: Undefined function: FROM_UNIXTIME at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:53) at java.base/java.util.Optional.orElseThrow(Optional.java:408) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:49) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:35) at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66) at org.apache.flink.table.api.internal.TableImpl.lambda$preprocessExpressions$0(TableImpl.java:605) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.flink.table.api.internal.TableImpl.preprocessExpressions(TableImpl.java:606) at org.apache.flink.table.api.internal.TableImpl.access$300(TableImpl.java:66) at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:775) at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:770) {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27760) NPE is thrown when executing PyFlink jobs in batch mode
Dian Fu created FLINK-27760: --- Summary: NPE is thrown when executing PyFlink jobs in batch mode Key: FLINK-27760 URL: https://issues.apache.org/jira/browse/FLINK-27760 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.13.0 Reporter: Dian Fu Assignee: Dian Fu This is the exception stack reported by one user: {code} 022-05-25 11:39:32,792 [MultipleInput(readOrder=[2,1,0], members=[\nHashJoin(joinType=[I... -> PythonCal ... with job vertex id 71d9b8e1b249eaa7e67ef93fb483177f (63/100)#123] WARN org.apache.flink.runtime.taskmanager.Task [] - MultipleInput(readOrder=[2,1,0], members=[\nHashJoin(joinType=[I... -> PythonCal ... with job vertex id 71d9b8e1b249eaa7e67ef93fb483177f (63/100)#123 (6fa78755ff19ac9d0d57aba21840e834) switched from INITIALIZING to FAILED with failure cause: java.lang.NullPointerException at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getKeyedStateBackend(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processElementsOfCurrentKeyIfNeeded(AbstractPythonFunctionOperator.java:238) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:208) at org.apache.flink.table.runtime.operators.multipleinput.output.OneInputStreamOperatorOutput.emitWatermark(OneInputStreamOperatorOutput.java:45) at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:39) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:632) at org.apache.flink.table.runtime.operators.TableStreamOperator.processWatermark(TableStreamOperator.java:74) at org.apache.flink.table.runtime.operators.multipleinput.output.OneInputStreamOperatorOutput.emitWatermark(OneInputStreamOperatorOutput.java:45) at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:39) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:632) at org.apache.flink.table.runtime.operators.TableStreamOperator.processWatermark(TableStreamOperator.java:74) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:649) at org.apache.flink.table.runtime.operators.multipleinput.input.SecondInputOfTwoInput.processWatermark(SecondInputOfTwoInput.java:44) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamMultipleInputProcessorFactory.java:318) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:137) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:87) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:569) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:541) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:586) at java.lang.Thread.run(Thread.java:877) {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27729) Support constructing StartCursor and StopCursor from MessageId
Dian Fu created FLINK-27729: --- Summary: Support constructing StartCursor and StopCursor from MessageId Key: FLINK-27729 URL: https://issues.apache.org/jira/browse/FLINK-27729 Project: Flink Issue Type: Improvement Components: API / Python, Connectors / Pulsar Reporter: Dian Fu Fix For: 1.16.0 Currently, StartCursor.fromMessageId and StopCursor.fromMessageId are still not supported in Python pulsar connectors. I think we could leverage the [pulsar Python library|https://pulsar.apache.org/api/python/#pulsar.MessageId] to implement these methods. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27598) Improve the exception message when mixing use Python UDF and Pandas UDF
Dian Fu created FLINK-27598: --- Summary: Improve the exception message when mixing use Python UDF and Pandas UDF Key: FLINK-27598 URL: https://issues.apache.org/jira/browse/FLINK-27598 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu For the following job: {code} import argparse from decimal import Decimal from pyflink.common import Row from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import AggregateFunction, udaf class DeduplicatedSum(AggregateFunction): def create_accumulator(self): return \{int(0), float(0)} def get_value(self, accumulator) -> float: sum(accumulator.values()) def accumulate(self, accumulator, k: int, v: float): if k not in accumulator: accumulator[k] = v def retract(self, accumulator, k: int, v: float): if k in accumulator: del accumulator[k] deduplicated_sum = udaf(f=DeduplicatedSum(), func_type="pandas", result_type=DataTypes.DOUBLE(), input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()]) class FirstValue(AggregateFunction): def create_accumulator(self): return [int(-1), float(0)] def get_value(self, accumulator) -> float: return accumulator[1] def accumulate(self, accumulator, k: int, v: float): ck = accumulator[0] if ck > k: accumulator[0] = k accumulator[1] = v first_value = udaf(f=FirstValue(), result_type=DataTypes.DOUBLE(), func_type="pandas", input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()]) class LastValue(AggregateFunction): def create_accumulator(self): return [int(-1), float(0)] def get_value(self, accumulator: Row) -> float: return accumulator[1] def accumulate(self, accumulator: Row, k: int, v: float): ck = accumulator[0] if ck < k: accumulator[0] = k accumulator[1] = v last_value = udaf(f=LastValue(), func_type="pandas", result_type=DataTypes.DOUBLE(), input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()]) def create_source_table_trades(table_env): source = f""" CREATE TABLE src_trade ( `id` VARCHAR ,`timestamp` BIGINT ,`side` VARCHAR ,`price` DOUBLE ,`size` DOUBLE ,`uniqueId` BIGINT ,ts_micro AS `timestamp` ,ts_milli AS `timestamp` / 1000 ,ts AS TO_TIMESTAMP_LTZ(`timestamp` / 1000, 3) ,WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND ) WITH ( 'connector' = 'datagen') """ table_env.execute_sql(source) def create_sink_table(table_env): sink = f""" CREATE TABLE dst_kline ( wst TIMESTAMP_LTZ(3) ,wet TIMESTAMP_LTZ(3) ,otm BIGINT ,ot TIMESTAMP_LTZ(3) ,ctm BIGINT ,ct TIMESTAMP_LTZ(3) ,ptm BIGINT ,pt TIMESTAMP_LTZ(3) ,`open` DOUBLE ,`close` DOUBLE ,`high` DOUBLE ,`low` DOUBLE ,`vol` DOUBLE -- total trade volume ,`to` DOUBLE -- total turnover value ,`rev` INT -- revision, something we might use for versioning ,`gap` INT -- if this value is reliable ,PRIMARY KEY(wst) NOT ENFORCED ) WITH ( 'connector' = 'print' ) """ table_env.execute_sql(sink) def kafka_src_topic(value): if not len(value.split('-')) == 5: raise argparse.ArgumentTypeError("{} is not a valid kafka topic".format(value)) return value def interval(value): i = [] prev_num = [] for character in value: if character.isalpha(): if prev_num: num = Decimal(''.join(prev_num)) if character == 'd': i.append(f"'\{num}' DAYS") elif character == 'h': i.append(f"'\{num}' HOURS") elif character == 'm': i.append(f"'\{num}' MINUTES") elif character == 's': i.append(f"'\{num}' SECONDS") prev_num = [] elif character.isnumeric() or character == '.': prev_num.append(character) return " ".join(i) def fetch_arguments_flink_kline(): import argparse parser = argparse.ArgumentParser() parser.add_argument('--bootstrap-servers', type=str, required=True) parser.add_argument('--src-topic', type=kafka_src_topic) parser.add_argument('--consume-mode', type=str, default='group-offsets', choices=['group-offsets', 'latest-offset'], help='scan.startup.mode for kafka') parser.add_argument('--interval', type=str, default='20s', help='output interval e.g. 5d4h3m1s, default to 20s') parser.add_argument('--force-test', action='store_true') parser.add_argument('--consumer-group-hint', type=str, default='1') args = parser.parse_args() if args.force_test and args.consumer_group_hint == '1': parser.error("With --force-test, should not use default '1' for --consumer-group-hint") return args def main(): # args = fetch_arguments_flink_kline() # parts = args.src_topic.split('-') # _, e, p, s, _ = parts # dst_topic = f'\{e}-\{p}-\{s}-Kline\{args.interval}' env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) # table_env.get_config().get_configuration().set_boolean("table.exec.emit.early-fire.enabled", True) # table_env.get_config().get_configuration().set_string("table.exec.emit.early-fire.delay", "0 s") table_env.get_config().get_configuration().set_string("table.exec.emit.allow-lateness", "1 h") # table_env.get_config().get_config
[jira] [Created] (FLINK-27584) Support broadcast state in PyFlink DataStream API
Dian Fu created FLINK-27584: --- Summary: Support broadcast state in PyFlink DataStream API Key: FLINK-27584 URL: https://issues.apache.org/jira/browse/FLINK-27584 Project: Flink Issue Type: New Feature Components: API / Python Reporter: Dian Fu Assignee: Juntao Hu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27564) Split PyFlink DataStream connectors into separate files
Dian Fu created FLINK-27564: --- Summary: Split PyFlink DataStream connectors into separate files Key: FLINK-27564 URL: https://issues.apache.org/jira/browse/FLINK-27564 Project: Flink Issue Type: Technical Debt Components: API / Python Reporter: Dian Fu Fix For: 1.16.0 Currently all the connectors are located in [connectors.py|https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py]. As more and more connectors are added, it makes this file more and more big. Besides, it's confusing for users as it's not clear for users which classes belonging to which connectors. It would be great to split different connectors into different files, e.g. pyflink/datastream/connectors/jdbc.py, pyflink/datastream/connectors/file_system.py, etc. However, it should be taken into account to keep backward compatibility when performing this refactor. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27545) Update examples in PyFlink shell
Dian Fu created FLINK-27545: --- Summary: Update examples in PyFlink shell Key: FLINK-27545 URL: https://issues.apache.org/jira/browse/FLINK-27545 Project: Flink Issue Type: Bug Components: API / Python, Examples Affects Versions: 1.15.0, 1.14.0 Reporter: Dian Fu Assignee: Dian Fu The examples in pyflink.shell is outdated and we should update it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27421) Bundle test utility classes into the PyFlink package to make users write test cases easily
Dian Fu created FLINK-27421: --- Summary: Bundle test utility classes into the PyFlink package to make users write test cases easily Key: FLINK-27421 URL: https://issues.apache.org/jira/browse/FLINK-27421 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Fix For: 1.16.0 Currently, the test utility classes are not bundled in the PyFlink package. This doesn't affect the functionalities. However, when users are trying out PyFlink and developing jobs with PyFlink, they may want to write some unit tests to ensure the functionalities work as expected. There are already some test utility classes in PyFlink, bundling them in the PyFlink package could help users a lot and allow they write unit tests more easier. See https://lists.apache.org/thread/9z468o1hmg4bm7b2vz2o3lkmoqhxnxg1 for more details. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27373) Disable installing PyFlink using `python setup.py install`
Dian Fu created FLINK-27373: --- Summary: Disable installing PyFlink using `python setup.py install` Key: FLINK-27373 URL: https://issues.apache.org/jira/browse/FLINK-27373 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu When users install PyFlink using `pip install .` or `pip install apache-flink`, it will be installed into site-packages with directories as following: {code} apache_flink-1.14.4.dist-info apache_flink_libraries-1.14.4.dist-info pyflink {code} However, if installed using `python setup.py install`, the result directories will be as following: {code} apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg apache_flink_libraries-1.14.4-py3.8.egg {code} The consequence of the latter case is that the installed package is problematic. The reason is that it contains jar packages in apache_flink_libraries and currently these jar packages need to be installed together with apache_flink to make them work(under directory pyflink/lib and pyflink/opt). Users will get the following error when installing PyFlink via "python setup.py install": {code} Error: Could not find or load main class org.apache.flink.client.python.PythonGatewayServer Caused by: java.lang.ClassNotFoundException: org.apache.flink.client.python.PythonGatewayServer E/Users/dianfu/miniconda3/lib/python3.8/unittest/case.py:704: ResourceWarning: unclosed file <_io.BufferedWriter name=5> outcome.errors.clear() ResourceWarning: Enable tracemalloc to get the object allocation traceback == ERROR: test_scalar_function (test_table_api.TableTests) -- Traceback (most recent call last): File "/Users/dianfu/code/src/github/pyflink-faq/testing/test_utils.py", line 122, in setUp self.t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) File "/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg/pyflink/table/environment_settings.py", line 267, in in_streaming_mode get_gateway().jvm.EnvironmentSettings.inStreamingMode()) File "/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg/pyflink/java_gateway.py", line 62, in get_gateway _gateway = launch_gateway() File "/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg/pyflink/java_gateway.py", line 112, in launch_gateway raise Exception("Java gateway process exited before sending its port number") Exception: Java gateway process exited before sending its port number {code} `python setup.py install` is already deprecated in Python community and it's suggesting to use `pip install`. We have two choices: - Support installing PyFlink using `python setup.py install` - Disable installing PyFlink using `python setup.py install` and throw a meaningful exception if users install it with `python setup.py install` to make it more explicit -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27223) State access doesn't work as expected when cache size is set to 0
Dian Fu created FLINK-27223: --- Summary: State access doesn't work as expected when cache size is set to 0 Key: FLINK-27223 URL: https://issues.apache.org/jira/browse/FLINK-27223 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.15.1 For the following job: {code} import json import logging import sys from pyflink.common import Types, Configuration from pyflink.datastream import StreamExecutionEnvironment from pyflink.util.java_utils import get_j_env_configuration if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") env = StreamExecutionEnvironment.get_execution_environment() config = Configuration( j_configuration=get_j_env_configuration(env._j_stream_execution_environment)) config.set_integer("python.state.cache-size", 0) env.set_parallelism(1) # define the source ds = env.from_collection( collection=[ (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'), (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'), (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'), (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}') ], type_info=Types.ROW_NAMED(["id", "info"], [Types.INT(), Types.STRING()]) ) # key by ds = ds.map(lambda data: (json.loads(data.info)['addr']['country'], json.loads(data.info)['tel'])) \ .key_by(lambda data: data[0]).sum(1) ds.print() env.execute() {code} The expected result should be: {code} ('Germany', 123) ('China', 135) ('USA', 124) ('China', 167) {code} However, the actual result is: {code} ('Germany', 123) ('China', 135) ('USA', 124) ('China', 32) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27126) Respect the state cache size configurations in Python DataStream API operators
Dian Fu created FLINK-27126: --- Summary: Respect the state cache size configurations in Python DataStream API operators Key: FLINK-27126 URL: https://issues.apache.org/jira/browse/FLINK-27126 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Currently, the state cache size configurations are not handled in Python DataStream API operators. See https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/beam/beam_operations.py#L188 for more details. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27108) State cache clean up doesn't work as expected
Dian Fu created FLINK-27108: --- Summary: State cache clean up doesn't work as expected Key: FLINK-27108 URL: https://issues.apache.org/jira/browse/FLINK-27108 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.0, 1.13.0, 1.15.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.15.0, 1.13.7, 1.14.5 The test case test_session_window_late_merge failed when working on FLINK-26190. After digging into this problem, the reason should be that the logic to determine [whether a key & namespace exists in state cache is wrong|https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/state_impl.py#L1183] is wrong. It causes the state cache isn't clean up when it becomes invalidate. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27067) Prevent usage of deprecated APIs in PyFlink examples
Dian Fu created FLINK-27067: --- Summary: Prevent usage of deprecated APIs in PyFlink examples Key: FLINK-27067 URL: https://issues.apache.org/jira/browse/FLINK-27067 Project: Flink Issue Type: Improvement Components: API / Python, Examples Reporter: Dian Fu It has prevented usage of deprecated APIs in Java/Scala examples in FLINK-24833. It would be great to also perform this enforcement for Python examples. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26986) Remove deprecated string expressions in Python Table API
Dian Fu created FLINK-26986: --- Summary: Remove deprecated string expressions in Python Table API Key: FLINK-26986 URL: https://issues.apache.org/jira/browse/FLINK-26986 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.15.0 In FLINK-26704, it has removed the string expressions in Table API. However, there are still some APIs still using string expressions in Python Table API, however, they should not work any more as the string expressions have already been removed in the Java Table API. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26920) It reported "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."
Dian Fu created FLINK-26920: --- Summary: It reported "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s." Key: FLINK-26920 URL: https://issues.apache.org/jira/browse/FLINK-26920 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.0, 1.13.0, 1.12.0, 1.15.0 Reporter: Dian Fu For the following code: {code} import numpy as np from pyflink.common import Row from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext from pyflink.table import StreamTableEnvironment from sklearn import svm, datasets env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) # Table Source t_env.execute_sql(""" CREATE TABLE my_source ( a FLOAT, key STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.a.min' = '4.3', 'fields.a.max' = '7.9', 'fields.key.length' = '10' ) """) def process_type(): return Types.ROW_NAMED( ["a", "key"], [Types.FLOAT(), Types.STRING()] ) # append only datastream ds = t_env.to_append_stream( t_env.from_path('my_source'), process_type()) class MyKeyedProcessFunction(KeyedProcessFunction): def open(self, runtime_context: RuntimeContext): clf = svm.SVC() X, y= datasets.load_iris(return_X_y=True) clf.fit(X, y) self.model = clf def process_element(self, value: Row, ctx: 'KeyedProcessFunction.Context'): # 根据role_id + space去redis查询回合结算日志 features = np.array([value['a'], 3.5, 1.4, 0.2]).reshape(1, -1) predict = int(self.model.predict(features)[0]) yield Row(predict=predict, role_id=value['key']) ds = ds.key_by(lambda a: a['key'], key_type=Types.STRING()) \ .process( MyKeyedProcessFunction(), output_type=Types.ROW_NAMED( ["hit", "role_id"], [Types.INT(), Types.STRING()] )) # 采用table sink t_env.execute_sql(""" CREATE TABLE my_sink ( hit INT, role_id STRING ) WITH ( 'connector' = 'print' ) """) t_env.create_temporary_view("predict", ds) t_env.execute_sql(""" INSERT INTO my_sink SELECT * FROM predict """).wait() {code} It reported the following exception: {code} Caused by: java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:233) at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56) at org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116) at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:121) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:712) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:655) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26919) Table API program compiles field with "Cannot resolve field [a], input field list:[EXPR$0]"
Dian Fu created FLINK-26919: --- Summary: Table API program compiles field with "Cannot resolve field [a], input field list:[EXPR$0]" Key: FLINK-26919 URL: https://issues.apache.org/jira/browse/FLINK-26919 Project: Flink Issue Type: Bug Components: API / Python, Table SQL / Planner Affects Versions: 1.14.0 Reporter: Dian Fu For the following job: {code} import datetime as dt from pyflink.table import * from pyflink.table.window import Tumble from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE, CURRENT_RANGE def make_table(): env_settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(env_settings) source_data_path = '///path/to/source/directory/test.csv' sink_data_path = '///path/to/sink/directory/' source_ddl = f""" create table Orders( a VARCHAR, b BIGINT, c BIGINT, rowtime TIMESTAMP(3), WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '{source_data_path}' ) """ table_env.execute_sql(source_ddl) sink_ddl = f""" create table `Result`( a VARCHAR, cnt BIGINT ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '{sink_data_path}' ) """ table_env.execute_sql(sink_ddl) orders = table_env.from_path('Orders') orders.order_by(orders.a).select(orders.a, orders.b.count).execute_insert("Result").wait() if __name__ == '__main__': make_table() {code} It compiles failed with the following exception: {code} org.apache.flink.table.api.ValidationException: Cannot resolve field [a], input field list:[EXPR$0]. at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.failForField(ReferenceResolverRule.java:93) at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$null$3(ReferenceResolverRule.java:87) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$null$4(ReferenceResolverRule.java:85) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$visit$5(ReferenceResolverRule.java:79) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.visit(ReferenceResolverRule.java:73) at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.visit(ReferenceResolverRule.java:51) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:29) at org.apache.flink.table.expressions.UnresolvedReferenceExpression.accept(UnresolvedReferenceExpression.java:59) at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule.lambda$apply$0(ReferenceResolverRule.java:47) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule.apply(ReferenceResolverRule.java:48) at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:241) at java.util.function.Function.lambda$andThen$1(Function.java:88) at java.util.function.Function.lambda$andThen$1(Function.java:88) at java.util.function.Function.lambda$andThen$1(Function.java:88) at java.util.function.Function.lambda$andThen$1(Function.java:88) at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:204) at org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194) at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:169) at org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:138) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[jira] [Created] (FLINK-26855) ImportError: cannot import name 'environmentfilter' from 'jinja2'
Dian Fu created FLINK-26855: --- Summary: ImportError: cannot import name 'environmentfilter' from 'jinja2' Key: FLINK-26855 URL: https://issues.apache.org/jira/browse/FLINK-26855 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.0, 1.16.0 Reporter: Dian Fu {code} ar 24 17:38:39 ===mypy checks... [SUCCESS]=== Mar 24 17:38:39 rm -rf _build/* Mar 24 17:38:39 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d _build/doctrees -a -W . _build/html Mar 24 17:38:40 Traceback (most recent call last): Mar 24 17:38:40 File "/__w/2/s/flink-python/dev/.conda/bin/sphinx-build", line 6, in Mar 24 17:38:40 from sphinx.cmd.build import main Mar 24 17:38:40 File "/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/cmd/build.py", line 23, in Mar 24 17:38:40 from sphinx.application import Sphinx Mar 24 17:38:40 File "/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/application.py", line 42, in Mar 24 17:38:40 from sphinx.highlighting import lexer_classes, lexers Mar 24 17:38:40 File "/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/highlighting.py", line 30, in Mar 24 17:38:40 from sphinx.ext import doctest Mar 24 17:38:40 File "/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/ext/doctest.py", line 28, in Mar 24 17:38:40 from sphinx.builders import Builder Mar 24 17:38:40 File "/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/builders/__init__.py", line 24, in Mar 24 17:38:40 from sphinx.io import read_doc Mar 24 17:38:40 File "/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/io.py", line 42, in Mar 24 17:38:40 from sphinx.util.rst import append_epilog, docinfo_re, prepend_prolog Mar 24 17:38:40 File "/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/util/rst.py", line 22, in Mar 24 17:38:40 from jinja2 import environmentfilter Mar 24 17:38:40 ImportError: cannot import name 'environmentfilter' from 'jinja2' (/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/jinja2/__init__.py) Mar 24 17:38:40 Makefile:76: recipe for target 'html' failed Mar 24 17:38:40 make: *** [html] Error 1 Mar 24 17:38:40 ==sphinx checks... [FAILED]=== {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=33717&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901&l=23450 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26847) Unify the logic for the command line option -py and -pym
Dian Fu created FLINK-26847: --- Summary: Unify the logic for the command line option -py and -pym Key: FLINK-26847 URL: https://issues.apache.org/jira/browse/FLINK-26847 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26846) Gauge metrics doesn't work in PyFlink
Dian Fu created FLINK-26846: --- Summary: Gauge metrics doesn't work in PyFlink Key: FLINK-26846 URL: https://issues.apache.org/jira/browse/FLINK-26846 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.12.0 Reporter: Dian Fu Assignee: Dian Fu See https://lists.apache.org/thread/w7jkwgpon6qy4p6k1nhhw5k4m81r8c8p for more details. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26506) Support StreamExecutionEnvironment.registerCachedFile in Python DataStream API
Dian Fu created FLINK-26506: --- Summary: Support StreamExecutionEnvironment.registerCachedFile in Python DataStream API Key: FLINK-26506 URL: https://issues.apache.org/jira/browse/FLINK-26506 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Fix For: 1.16.0 This API is missed in Python DataStream API. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26483) Support WindowedStream.aggregate in Python DataStream API
Dian Fu created FLINK-26483: --- Summary: Support WindowedStream.aggregate in Python DataStream API Key: FLINK-26483 URL: https://issues.apache.org/jira/browse/FLINK-26483 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26482) Support WindowedStream.reduce in Python DataStream API
Dian Fu created FLINK-26482: --- Summary: Support WindowedStream.reduce in Python DataStream API Key: FLINK-26482 URL: https://issues.apache.org/jira/browse/FLINK-26482 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26481) Support side output for windowing in Python DataStream API
Dian Fu created FLINK-26481: --- Summary: Support side output for windowing in Python DataStream API Key: FLINK-26481 URL: https://issues.apache.org/jira/browse/FLINK-26481 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26480) Support window_all in Python DataStream API
Dian Fu created FLINK-26480: --- Summary: Support window_all in Python DataStream API Key: FLINK-26480 URL: https://issues.apache.org/jira/browse/FLINK-26480 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26479) Add time_window and count_window in KeyedStream in Python DataStream API
Dian Fu created FLINK-26479: --- Summary: Add time_window and count_window in KeyedStream in Python DataStream API Key: FLINK-26479 URL: https://issues.apache.org/jira/browse/FLINK-26479 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.16.0 This is to align with the Java API and provide a few handy methods to use. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26478) Support evictor in Python DataStream API
Dian Fu created FLINK-26478: --- Summary: Support evictor in Python DataStream API Key: FLINK-26478 URL: https://issues.apache.org/jira/browse/FLINK-26478 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Currently, evictor is still not supported in Python DataStream API. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26477) Finalize the window support in Python DataStream API
Dian Fu created FLINK-26477: --- Summary: Finalize the window support in Python DataStream API Key: FLINK-26477 URL: https://issues.apache.org/jira/browse/FLINK-26477 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Fix For: 1.16.0 It has provided window support in Python DataStream API in FLINK-21842. However, there are still a few functionalities missing, e.g. evictor, etc. This is an umbrella JIRA to collect all the missing part of work. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25676) Add set_description in Python API
Dian Fu created FLINK-25676: --- Summary: Add set_description in Python API Key: FLINK-25676 URL: https://issues.apache.org/jira/browse/FLINK-25676 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.15.0 This is to align with the setDescription method introduced in FLINK-25072. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25231) Update PyFlink to use the new type system
Dian Fu created FLINK-25231: --- Summary: Update PyFlink to use the new type system Key: FLINK-25231 URL: https://issues.apache.org/jira/browse/FLINK-25231 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Fix For: 1.15.0 Currently, there are a lot of places in PyFlink Table API still using the legacy type system. We need to revisit this and migrate them to the new type system(DataType/LogicalType). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25216) test_sliding_group_window_over_proctime failed with "IndexError: list index out of range"
Dian Fu created FLINK-25216: --- Summary: test_sliding_group_window_over_proctime failed with "IndexError: list index out of range" Key: FLINK-25216 URL: https://issues.apache.org/jira/browse/FLINK-25216 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.0 Reporter: Dian Fu {code} self = Dec 06 03:10:43 Dec 06 03:10:43 def test_sliding_group_window_over_proctime(self): Dec 06 03:10:43 self.t_env.get_config().get_configuration().set_string("parallelism.default", "1") Dec 06 03:10:43 from pyflink.table.window import Slide Dec 06 03:10:43 self.t_env.register_function("mean_udaf", mean_udaf) Dec 06 03:10:43 Dec 06 03:10:43 source_table = """ Dec 06 03:10:43 create table source_table( Dec 06 03:10:43 a INT, Dec 06 03:10:43 proctime as PROCTIME() Dec 06 03:10:43 ) with( Dec 06 03:10:43 'connector' = 'datagen', Dec 06 03:10:43 'rows-per-second' = '1', Dec 06 03:10:43 'fields.a.kind' = 'sequence', Dec 06 03:10:43 'fields.a.start' = '1', Dec 06 03:10:43 'fields.a.end' = '10' Dec 06 03:10:43 ) Dec 06 03:10:43 """ Dec 06 03:10:43 self.t_env.execute_sql(source_table) Dec 06 03:10:43 t = self.t_env.from_path("source_table") Dec 06 03:10:43 iterator = t.select("a, proctime") \ Dec 06 03:10:43 .window(Slide.over("1.seconds").every("1.seconds").on("proctime").alias("w")) \ Dec 06 03:10:43 .group_by("a, w") \ Dec 06 03:10:43 .select("mean_udaf(a) as b, w.start").execute().collect() Dec 06 03:10:43 result = [i for i in iterator] Dec 06 03:10:43 # if the WindowAssigner.isEventTime() does not return false, Dec 06 03:10:43 # the w.start would be 1970-01-01 Dec 06 03:10:43 # TODO: After fixing the TimeZone problem of window with processing time (will be fixed in Dec 06 03:10:43 # FLIP-162), we should replace it with a more accurate assertion. Dec 06 03:10:43 > self.assertTrue(result[0][1].year > 1970) Dec 06 03:10:43 E IndexError: list index out of range {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=27569&view=logs&j=3e4dd1a2-fe2f-5e5d-a581-48087e718d53&t=b4612f28-e3b5-5853-8a8b-610ae894217a&l=21780 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24962) The root cause of a failed job was hidden in certain cases
Dian Fu created FLINK-24962: --- Summary: The root cause of a failed job was hidden in certain cases Key: FLINK-24962 URL: https://issues.apache.org/jira/browse/FLINK-24962 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.14.0 Reporter: Dian Fu Fix For: 1.14.1 For the following job: {code} from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment, DataStream from pyflink.table import StreamTableEnvironment def state_access_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) sql = """CREATE TABLE kafka_source ( id int, name string ) WITH ( 'connector' = 'kafka', 'topic' = 'test', 'properties.bootstrap.servers' = '***', 'properties.group.id' = '***', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' )""" t_env.execute_sql(sql) table = t_env.from_path("kafka_source") ds: DataStream = t_env.to_append_stream(table, type_info=Types.ROW([Types.INT(), Types.STRING()])) ds.map(lambda a: print(a)) env.execute('state_access_demo') if __name__ == '__main__': state_access_demo() {code} This job failed with the following exception which doesn't contain any useful information (the root cause is that it should return a value in the map function): {code} Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness: D:\soft\anaconda\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=2-1 --provision_endpoint=localhost:57201 INFO:root:Starting up Python harness in loopback mode. at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:566) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:255) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:131) at org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116) at org.apache.flink.streaming.api.operators.python.PythonProcessOperator.open(PythonProcessOperator.java:59) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0 at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:451) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:436) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:564) ... 14 more Caused by: java.lang.IllegalStateException: Process died with exit code 0 at org.apache.beam.runners.fnexecution.environment.ProcessManager$Ru
[jira] [Created] (FLINK-24662) PyFlink sphinx check failed with "node class 'meta' is already registered, its visitors will be overridden"
Dian Fu created FLINK-24662: --- Summary: PyFlink sphinx check failed with "node class 'meta' is already registered, its visitors will be overridden" Key: FLINK-24662 URL: https://issues.apache.org/jira/browse/FLINK-24662 Project: Flink Issue Type: Bug Components: API / Python, Tests Affects Versions: 1.14.0, 1.13.0, 1.15.0 Reporter: Dian Fu Assignee: Dian Fu [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=25481&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=8d78fe4f-d658-5c70-12f8-4921589024c3] {code} ==mypy checks... [SUCCESS]=== Oct 26 22:08:34 rm -rf _build/* Oct 26 22:08:34 /__w/1/s/flink-python/dev/.conda/bin/sphinx-build -b html -d _build/doctrees -a -W . _build/html Oct 26 22:08:34 Running Sphinx v2.4.4 Oct 26 22:08:34 Oct 26 22:08:34 Warning, treated as error: Oct 26 22:08:34 node class 'meta' is already registered, its visitors will be overridden Oct 26 22:08:34 Makefile:76: recipe for target 'html' failed {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24287) Bump virtualenv to the latest version
Dian Fu created FLINK-24287: --- Summary: Bump virtualenv to the latest version Key: FLINK-24287 URL: https://issues.apache.org/jira/browse/FLINK-24287 Project: Flink Issue Type: Improvement Components: API / Python, Tests Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.15.0 Currently, the virtualenv version (16.0.0) used in PyFlink tests is a little outdated(the latest version is ). The pip bundled in the old virtualenv is a little old. The consequence is that it will compile the grpcio library from source instead of using the wheel package during installing grpcio. It takes several minutes to compile grpcio and the time could be avoided after bump virtualenv. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24244) Add logging about whether it's executed in loopback mode
Dian Fu created FLINK-24244: --- Summary: Add logging about whether it's executed in loopback mode Key: FLINK-24244 URL: https://issues.apache.org/jira/browse/FLINK-24244 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 Currently, it's unclear whether a job is running in loopback mode or process mode, it would be great to add some logging to make it clear. This would be helpful for debugging. It makes it clear whether a failed test is running in loopback mode or process mode. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24243) Clean up the code and avoid warning messages introduced by deprecated API
Dian Fu created FLINK-24243: --- Summary: Clean up the code and avoid warning messages introduced by deprecated API Key: FLINK-24243 URL: https://issues.apache.org/jira/browse/FLINK-24243 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Currently, there are quite a few warning messages when executing PyFlink jobs, e.g. {code} Process finished with exit code 0 /usr/local/Cellar/python@3.7/3.7.10_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/subprocess.py:883: ResourceWarning: subprocess 75115 is still running ResourceWarning, source=self) ResourceWarning: Enable tracemalloc to get the object allocation traceback /Users/dianfu/code/src/apache/flink/flink-python/pyflink/table/udf.py:326: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working if not isinstance(input_types, collections.Iterable) \ /Users/dianfu/code/src/apache/flink/flink-python/pyflink/table/table_environment.py:537: DeprecationWarning: Deprecated in 1.10. Use create_table instead. warnings.warn("Deprecated in 1.10. Use create_table instead.", DeprecationWarning) /Users/dianfu/venv/examples-37/lib/python3.7/site-packages/future/standard_library/__init__.py:65: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses import imp 2021-09-10 15:03:47,335 - apache_beam.typehints.native_type_compatibility - INFO - Using Any for unsupported type: typing.Sequence[~T] /Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/state_impl.py:677: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working class RemovableConcatIterator(collections.Iterator): /Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/utils/operation_utils.py:19: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working from collections import Generator {code} We should clean up the code and avoid warning messages introduced by deprecated API by using latest API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24101) Support to generate PyDocs without Flink distribution
Dian Fu created FLINK-24101: --- Summary: Support to generate PyDocs without Flink distribution Key: FLINK-24101 URL: https://issues.apache.org/jira/browse/FLINK-24101 Project: Flink Issue Type: Improvement Components: API / Python, Documentation Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24097) Remove the streaming check in StreamTableEnvironment in PyFlink
Dian Fu created FLINK-24097: --- Summary: Remove the streaming check in StreamTableEnvironment in PyFlink Key: FLINK-24097 URL: https://issues.apache.org/jira/browse/FLINK-24097 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 Since it has supported to DataStream batch mode in StreamTableEnvironment in FLINK-20897, it should also work in PyFlink. Currently there are a few checks in the Python code and we should remove them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24083) The result isn't as expected when the result type is generator of string for Python UDTF
Dian Fu created FLINK-24083: --- Summary: The result isn't as expected when the result type is generator of string for Python UDTF Key: FLINK-24083 URL: https://issues.apache.org/jira/browse/FLINK-24083 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.0 Reporter: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24082) Python UDTF throws exception when the result type is generator of Row
Dian Fu created FLINK-24082: --- Summary: Python UDTF throws exception when the result type is generator of Row Key: FLINK-24082 URL: https://issues.apache.org/jira/browse/FLINK-24082 Project: Flink Issue Type: Bug Affects Versions: 1.13.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24062) Exception encountered during timer serialization in Python DataStream API
Dian Fu created FLINK-24062: --- Summary: Exception encountered during timer serialization in Python DataStream API Key: FLINK-24062 URL: https://issues.apache.org/jira/browse/FLINK-24062 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 For the following example: {code} # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import logging import sys from pyflink.common import WatermarkStrategy, Encoder, Types from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy) word_count_data = ["To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--'tis a consummation", "Devoutly to be wish'd. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there's the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there's the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor's wrong, the proud man's contumely,", "The pangs of despis'd love, the law's delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover'd country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember'd."] def word_count(input_path, output_path): env = StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.BATCH) # write all the data to one file env.set_parallelism(1) # define the source if input_path is not None: ds = env.from_source( source=FileSource.for_record_stream_format(StreamFormat.text_line_format(), input_path) .process_static_file_set().build(), watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name="file_source" ) else: print("Executing word_count example with default input data set.") print("Use --input to specify file input.") ds = env.from_collection(word_count_data) def split(line): yield from line.split()
[jira] [Created] (FLINK-24049) TupleTypeInfo doesn't handle correctly for data types need conversion
Dian Fu created FLINK-24049: --- Summary: TupleTypeInfo doesn't handle correctly for data types need conversion Key: FLINK-24049 URL: https://issues.apache.org/jira/browse/FLINK-24049 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.13.0, 1.12.0, 1.14.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0, 1.12.6, 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24003) Lookback mode doesn't work when mixing use of Python Table API and Python DataStream API
Dian Fu created FLINK-24003: --- Summary: Lookback mode doesn't work when mixing use of Python Table API and Python DataStream API Key: FLINK-24003 URL: https://issues.apache.org/jira/browse/FLINK-24003 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.0 Reporter: Dian Fu Assignee: Huang Xingbo Fix For: 1.14.0 For the following program: {code} import logging import time from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction from pyflink.table import StreamTableEnvironment, DataTypes, Schema def test_chaining(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.get_config().get_configuration().set_boolean("python.operator-chaining.enabled", False) # 1. create source Table t_env.execute_sql(""" CREATE TABLE datagen ( id INT, data STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '1000' ) """) # 2. create sink Table t_env.execute_sql(""" CREATE TABLE print ( id BIGINT, data STRING, flag STRING ) WITH ( 'connector' = 'blackhole' ) """) t_env.execute_sql(""" CREATE TABLE print_2 ( id BIGINT, data STRING, flag STRING ) WITH ( 'connector' = 'blackhole' ) """) # 3. query from source table and perform calculations # create a Table from a Table API query: source_table = t_env.from_path("datagen") ds = t_env.to_append_stream( source_table, Types.ROW([Types.INT(), Types.STRING()])) ds1 = ds.map(lambda i: (i[0] * i[0], i[1])) ds2 = ds.map(lambda i: (i[0], i[1][2:])) class MyCoMapFunction(CoMapFunction): def map1(self, value): print('hahah') return value def map2(self, value): print('hahah') return value ds3 = ds1.connect(ds2).map(MyCoMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.STRING()])) ds4 = ds3.map(lambda i: (i[0], i[1], "left"), output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()])) ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\ .map(lambda i: i, output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()])) schema = Schema.new_builder() \ .column("f0", DataTypes.BIGINT()) \ .column("f1", DataTypes.STRING()) \ .column("f2", DataTypes.STRING()) \ .build() result_table_3 = t_env.from_data_stream(ds4, schema) statement_set = t_env.create_statement_set() statement_set.add_insert("print", result_table_3) result_table_4 = t_env.from_data_stream(ds5, schema) statement_set.add_insert("print_2", result_table_4) statement_set.execute().wait() if __name__ == "__main__": start_ts = time.time() test_chaining() end_ts = time.time() print("--- %s seconds ---" % (end_ts - start_ts)) {code} Lookback mode doesn't work. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23994) ArrayDataSerializer and MapDataSerializer doesn't handle correctly for Null values
Dian Fu created FLINK-23994: --- Summary: ArrayDataSerializer and MapDataSerializer doesn't handle correctly for Null values Key: FLINK-23994 URL: https://issues.apache.org/jira/browse/FLINK-23994 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.13.0, 1.12.0, 1.11.0, 1.10.0, 1.14.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0, 1.12.6, 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23944) PulsarSourceITCase.testTaskManagerFailure is instable
Dian Fu created FLINK-23944: --- Summary: PulsarSourceITCase.testTaskManagerFailure is instable Key: FLINK-23944 URL: https://issues.apache.org/jira/browse/FLINK-23944 Project: Flink Issue Type: Improvement Components: Connectors / Pulsar Affects Versions: 1.14.0 Reporter: Dian Fu [https://dev.azure.com/dianfu/Flink/_build/results?buildId=430&view=logs&j=f3dc9b18-b77a-55c1-591e-264c46fe44d1&t=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d] It's from my personal azure pipeline, however, I'm pretty sure that I have not touched any code related to this. {code:java} Aug 24 10:44:13 [ERROR] testTaskManagerFailure{TestEnvironment, ExternalContext, ClusterControllable}[1] Time elapsed: 258.397 s <<< FAILURE! Aug 24 10:44:13 java.lang.AssertionError: Aug 24 10:44:13 Aug 24 10:44:13 Expected: Records consumed by Flink should be identical to test data and preserve the order in split Aug 24 10:44:13 but: Mismatched record at position 7: Expected '0W6SzacX7MNL4xLL3BZ8C3ljho4iCydbvxIl' but was 'wVi5JaJpNvgkDEOBRC775qHgw0LyRW2HBxwLmfONeEmr' Aug 24 10:44:13 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) Aug 24 10:44:13 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) Aug 24 10:44:13 at org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase.testTaskManagerFailure(SourceTestSuiteBase.java:271) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23940) Improve the documentation about how to use logging in PyFlink jobs
Dian Fu created FLINK-23940: --- Summary: Improve the documentation about how to use logging in PyFlink jobs Key: FLINK-23940 URL: https://issues.apache.org/jira/browse/FLINK-23940 Project: Flink Issue Type: Improvement Components: API / Python, Documentation Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0, 1.12.6, 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23936) Python UDFs instances are released and reinitialized if there is no input for more than 1 minute
Dian Fu created FLINK-23936: --- Summary: Python UDFs instances are released and reinitialized if there is no input for more than 1 minute Key: FLINK-23936 URL: https://issues.apache.org/jira/browse/FLINK-23936 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.13.0, 1.12.0, 1.11.0, 1.10.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0, 1.12.6, 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23931) Format the exception message shown in PythonDriver
Dian Fu created FLINK-23931: --- Summary: Format the exception message shown in PythonDriver Key: FLINK-23931 URL: https://issues.apache.org/jira/browse/FLINK-23931 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 It will show messages like the following if the job could not compile in PythonDriver: {code:java} 2021-08-24 13:33:45,014 INFO org.apache.flink.client.python.PythonDriver [] - --- Python Process Started -- 2021-08-24 13:33:47,709 INFO org.apache.flink.client.python.PythonDriver [] - Traceback (most recent call last): 2021-08-24 13:33:47,709 INFO org.apache.flink.client.python.PythonDriver [] - File "/Users/dianfu/code/src/workspace/pyflink-examples/datastream/test_chaining.py", line 147, in 2021-08-24 13:33:47,709 INFO org.apache.flink.client.python.PythonDriver [] - test_chaining_2() 2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - File "/Users/dianfu/code/src/workspace/pyflink-examples/datastream/test_chaining.py", line 141, in test_chaining_2 2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - statement_set.execute().wait() 2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/table/statement_set.py", line 147, in execute 2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - return TableResult(self._j_statement_set.execute()) 2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ 2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - answer, self.gateway_client, self.target_id, self.name) 2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 158, in deco 2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - raise java_exception 2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`print_3` does not exists 2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:254) 2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:182) 2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:182) 2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - at scala.collection.Iterator$class.foreach(Iterator.scala:891) 2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - at scala.collection.AbstractTraversable.map(Traversable.scala:104) {code} The format could be improved to make it more readable by removing the prefix **INFO org.apache.flink.client.python.PythonDriver** in each line. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23929) Chaining optimization doesn't handle properly for transformations with multiple inputs
Dian Fu created FLINK-23929: --- Summary: Chaining optimization doesn't handle properly for transformations with multiple inputs Key: FLINK-23929 URL: https://issues.apache.org/jira/browse/FLINK-23929 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23819) Testing tgz file for python archives
Dian Fu created FLINK-23819: --- Summary: Testing tgz file for python archives Key: FLINK-23819 URL: https://issues.apache.org/jira/browse/FLINK-23819 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23818) Add documentation about tgz files support for python archives
Dian Fu created FLINK-23818: --- Summary: Add documentation about tgz files support for python archives Key: FLINK-23818 URL: https://issues.apache.org/jira/browse/FLINK-23818 Project: Flink Issue Type: Improvement Components: API / Python, Documentation Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23787) Providing utility class for initializing various process function test harnesses
Dian Fu created FLINK-23787: --- Summary: Providing utility class for initializing various process function test harnesses Key: FLINK-23787 URL: https://issues.apache.org/jira/browse/FLINK-23787 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu https://lists.apache.org/thread.html/r9b058976ebaf8fd25f35c0116f6e3ae564cea7d01eea150a686e65f5%40%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23754) Testing Python DataStream API chaining functionality
Dian Fu created FLINK-23754: --- Summary: Testing Python DataStream API chaining functionality Key: FLINK-23754 URL: https://issues.apache.org/jira/browse/FLINK-23754 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23753) Add documentation about Python DataStream API chaining optimization
Dian Fu created FLINK-23753: --- Summary: Add documentation about Python DataStream API chaining optimization Key: FLINK-23753 URL: https://issues.apache.org/jira/browse/FLINK-23753 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23619) Remove PythonTimestampsAndWatermarksOperator
Dian Fu created FLINK-23619: --- Summary: Remove PythonTimestampsAndWatermarksOperator Key: FLINK-23619 URL: https://issues.apache.org/jira/browse/FLINK-23619 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 The functionality of this operator could be implemented using the existing operators. So we could remove it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23616) Support to chain for the Python operators as much as possible
Dian Fu created FLINK-23616: --- Summary: Support to chain for the Python operators as much as possible Key: FLINK-23616 URL: https://issues.apache.org/jira/browse/FLINK-23616 Project: Flink Issue Type: Sub-task Components: API / Python Affects Versions: 1.14.0 Reporter: Dian Fu Assignee: Dian Fu -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23584) Introduce PythonCoProcessOperator and remove PythonCoFlatMapOperator & PythonCoMapOperator
Dian Fu created FLINK-23584: --- Summary: Introduce PythonCoProcessOperator and remove PythonCoFlatMapOperator & PythonCoMapOperator Key: FLINK-23584 URL: https://issues.apache.org/jira/browse/FLINK-23584 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23578) Remove Python operators PythonFlatMapOperator/PythonMapOperator/PythonPartitionCustomOperator and use PythonProcessOperator instead
Dian Fu created FLINK-23578: --- Summary: Remove Python operators PythonFlatMapOperator/PythonMapOperator/PythonPartitionCustomOperator and use PythonProcessOperator instead Key: FLINK-23578 URL: https://issues.apache.org/jira/browse/FLINK-23578 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23445) Separate data and timer connection into different channels for Python Table API operators
Dian Fu created FLINK-23445: --- Summary: Separate data and timer connection into different channels for Python Table API operators Key: FLINK-23445 URL: https://issues.apache.org/jira/browse/FLINK-23445 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 This is a follow-up of FLINK-23401 where we split the data and timer into different channels for Python DataStream operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23421) Unify the space handling in RowData CSV parser
Dian Fu created FLINK-23421: --- Summary: Unify the space handling in RowData CSV parser Key: FLINK-23421 URL: https://issues.apache.org/jira/browse/FLINK-23421 Project: Flink Issue Type: Bug Reporter: Dian Fu Currently, it calls `trim()` for types such as LocalDateTime, Boolean, [Int|https://github.com/apache/flink/blob/41ce9ccbf42537a854087b6ba33a61092a04538f/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L196], etc, however, doesn't calls `trim()` for the other types such as [Decimal|https://github.com/apache/flink/blob/41ce9ccbf42537a854087b6ba33a61092a04538f/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L285] etc. We should unify the behavior. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23401) Separate data and timer connection into different channel
Dian Fu created FLINK-23401: --- Summary: Separate data and timer connection into different channel Key: FLINK-23401 URL: https://issues.apache.org/jira/browse/FLINK-23401 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23400) Support to decode a single record for the Python coder
Dian Fu created FLINK-23400: --- Summary: Support to decode a single record for the Python coder Key: FLINK-23400 URL: https://issues.apache.org/jira/browse/FLINK-23400 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Fix For: 1.14.0 Currently, the Python coder always output a Python generator. This makes it impossible to separate the timers and the data into different channels. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23213) Remove ProcessFunctionOperation
Dian Fu created FLINK-23213: --- Summary: Remove ProcessFunctionOperation Key: FLINK-23213 URL: https://issues.apache.org/jira/browse/FLINK-23213 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 Currently there are three operations to support Python DataStream API: ProcessFunctionOperation, DataStreamKeyedStatefulOperation DataStreamStatelessFunctionOperation. Actually we could refactor it a bit to merge ProcessFunctionOperation and DataStreamStatelessFunctionOperation into one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23166) ZipUtils doesn't handle properly for softlinks inside the zip file
Dian Fu created FLINK-23166: --- Summary: ZipUtils doesn't handle properly for softlinks inside the zip file Key: FLINK-23166 URL: https://issues.apache.org/jira/browse/FLINK-23166 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.10.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.11.4, 1.14.0, 1.12.5, 1.13.2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23141) Support KafkaSerializationSchema in Kafka connector for Python DataStream API
Dian Fu created FLINK-23141: --- Summary: Support KafkaSerializationSchema in Kafka connector for Python DataStream API Key: FLINK-23141 URL: https://issues.apache.org/jira/browse/FLINK-23141 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 See [https://lists.apache.org/thread.html/r8e4a51a1df7b49a2a5cbcf2432559ae16510c6224a4a5dd8f2844d39%40%3Cuser.flink.apache.org%3E] for more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23138) Raise an exception if types other than PickledBytesTypeInfo are specified for state descriptor
Dian Fu created FLINK-23138: --- Summary: Raise an exception if types other than PickledBytesTypeInfo are specified for state descriptor Key: FLINK-23138 URL: https://issues.apache.org/jira/browse/FLINK-23138 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.13.2 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.13.0 Although it's expected that users could use any type to define the state descriptor, however, only PickledBytesTypeInfo is actually supported for the time being. Meaningful exception should be thrown if types other than PickledBytesTypeInfo are used. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23133) The dependencies are not handled properly when mixing use of Python Table API and Python DataStream API
Dian Fu created FLINK-23133: --- Summary: The dependencies are not handled properly when mixing use of Python Table API and Python DataStream API Key: FLINK-23133 URL: https://issues.apache.org/jira/browse/FLINK-23133 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.13.0, 1.12.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.12.5, 1.13.2 The reason is that when converting from DataStream to Table, the dependencies should be handled and set correctly for the existing DataStream operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23120) ByteArrayWrapperSerializer.serialize should use writeInt to serialize the length
Dian Fu created FLINK-23120: --- Summary: ByteArrayWrapperSerializer.serialize should use writeInt to serialize the length Key: FLINK-23120 URL: https://issues.apache.org/jira/browse/FLINK-23120 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.13.0, 1.12.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.12.5, 1.13.2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23033) Support object array in Python DataStream API
Dian Fu created FLINK-23033: --- Summary: Support object array in Python DataStream API Key: FLINK-23033 URL: https://issues.apache.org/jira/browse/FLINK-23033 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22913) Support Python UDF chaining in Python DataStream API
Dian Fu created FLINK-22913: --- Summary: Support Python UDF chaining in Python DataStream API Key: FLINK-22913 URL: https://issues.apache.org/jira/browse/FLINK-22913 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Fix For: 1.14.0 Currently, for the following job: {code} ds = .. ds.map(map_func1) .map(map_func2) {code} The Python function `map_func1` and `map_func2` will runs in separate Python workers and the result of `map_func1` will be transferred to JVM and then transferred to `map_func2` which may resides in another Python worker. This introduces redundant communication and serialization/deserialization overhead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22912) Support state ttl in Python DataStream API
Dian Fu created FLINK-22912: --- Summary: Support state ttl in Python DataStream API Key: FLINK-22912 URL: https://issues.apache.org/jira/browse/FLINK-22912 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22911) Align FLIP-136 (Improve interoperability between DataStream and Table API) in PyFlink Table API
Dian Fu created FLINK-22911: --- Summary: Align FLIP-136 (Improve interoperability between DataStream and Table API) in PyFlink Table API Key: FLINK-22911 URL: https://issues.apache.org/jira/browse/FLINK-22911 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22733) Type mismatch thrown in KeyedStream.union in Python DataStream API
Dian Fu created FLINK-22733: --- Summary: Type mismatch thrown in KeyedStream.union in Python DataStream API Key: FLINK-22733 URL: https://issues.apache.org/jira/browse/FLINK-22733 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.13.0, 1.12.0 Reporter: Dian Fu Fix For: 1.13.1, 1.12.5 See [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-DataStream-union-type-mismatch-td43855.html] for more details. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22589) There are quite a few Python classes doesn't appear in the Python docs
Dian Fu created FLINK-22589: --- Summary: There are quite a few Python classes doesn't appear in the Python docs Key: FLINK-22589 URL: https://issues.apache.org/jira/browse/FLINK-22589 Project: Flink Issue Type: Improvement Components: API / Python, Documentation Reporter: Dian Fu Fix For: 1.13.1 For example, SerializationSchema and DeserializationSchema doesn't appear in the Python doc: [https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/python/] Users have to turn to the source code to find out how to use these classes. -- This message was sent by Atlassian Jira (v8.3.4#803005)