[ https://issues.apache.org/jira/browse/FLINK-31478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dian Fu closed FLINK-31478. --------------------------- Fix Version/s: 1.16.2 1.18.0 1.17.1 Resolution: Fixed Fixed in: - master via 5e059efee864e17939a33f29272a848d00598531 - release-1.17 via ec5a09b3ce56426d1bdc8eeac4bf52cac9be015b - release-1.16 via cadf4b35fb6f20c8cba310fa54626d0b9bae1361 > TypeError: a bytes-like object is required, not 'JavaList' is thrown when > ds.execute_and_collect() is called on a KeyedStream > ----------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-31478 > URL: https://issues.apache.org/jira/browse/FLINK-31478 > Project: Flink > Issue Type: Bug > Components: API / Python > Reporter: Dian Fu > Assignee: Dian Fu > Priority: Major > Labels: pull-request-available > Fix For: 1.16.2, 1.18.0, 1.17.1 > > > {code} > ################################################################################ > # Licensed to the Apache Software Foundation (ASF) under one > # or more contributor license agreements. See the NOTICE file > # distributed with this work for additional information > # regarding copyright ownership. The ASF licenses this file > # to you under the Apache License, Version 2.0 (the > # "License"); you may not use this file except in compliance > # with the License. You may obtain a copy of the License at > # > # http://www.apache.org/licenses/LICENSE-2.0 > # > # Unless required by applicable law or agreed to in writing, software > # distributed under the License is distributed on an "AS IS" BASIS, > # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > # See the License for the specific language governing permissions and > # limitations under the License. > ################################################################################ > import argparse > import logging > import sys > from pyflink.common import WatermarkStrategy, Encoder, Types > from pyflink.datastream import StreamExecutionEnvironment, > RuntimeExecutionMode > from pyflink.datastream.connectors.file_system import (FileSource, > StreamFormat, FileSink, > OutputFileConfig, > RollingPolicy) > word_count_data = ["To be, or not to be,--that is the question:--", > "Whether 'tis nobler in the mind to suffer", > "The slings and arrows of outrageous fortune", > "Or to take arms against a sea of troubles,", > "And by opposing end them?--To die,--to sleep,--", > "No more; and by a sleep to say we end", > "The heartache, and the thousand natural shocks", > "That flesh is heir to,--'tis a consummation", > "Devoutly to be wish'd. To die,--to sleep;--", > "To sleep! perchance to dream:--ay, there's the rub;", > "For in that sleep of death what dreams may come,", > "When we have shuffled off this mortal coil,", > "Must give us pause: there's the respect", > "That makes calamity of so long life;", > "For who would bear the whips and scorns of time,", > "The oppressor's wrong, the proud man's contumely,", > "The pangs of despis'd love, the law's delay,", > "The insolence of office, and the spurns", > "That patient merit of the unworthy takes,", > "When he himself might his quietus make", > "With a bare bodkin? who would these fardels bear,", > "To grunt and sweat under a weary life,", > "But that the dread of something after death,--", > "The undiscover'd country, from whose bourn", > "No traveller returns,--puzzles the will,", > "And makes us rather bear those ills we have", > "Than fly to others that we know not of?", > "Thus conscience does make cowards of us all;", > "And thus the native hue of resolution", > "Is sicklied o'er with the pale cast of thought;", > "And enterprises of great pith and moment,", > "With this regard, their currents turn awry,", > "And lose the name of action.--Soft you now!", > "The fair Ophelia!--Nymph, in thy orisons", > "Be all my sins remember'd."] > def word_count(input_path, output_path): > env = StreamExecutionEnvironment.get_execution_environment() > env.set_runtime_mode(RuntimeExecutionMode.BATCH) > # write all the data to one file > env.set_parallelism(1) > # define the source > if input_path is not None: > ds = env.from_source( > > source=FileSource.for_record_stream_format(StreamFormat.text_line_format(), > input_path) > .process_static_file_set().build(), > watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), > source_name="file_source" > ) > else: > print("Executing word_count example with default input data set.") > print("Use --input to specify file input.") > ds = env.from_collection(word_count_data) > def split(line): > yield from line.split() > # compute word count > ds = ds.flat_map(split) \ > .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), > Types.INT()])) \ > .key_by(lambda i: i[0]) > # .reduce(lambda i, j: (i[0], i[1] + j[1])) > # define the sink > if output_path is not None: > ds.sink_to( > sink=FileSink.for_row_format( > base_path=output_path, > encoder=Encoder.simple_string_encoder()) > .with_output_file_config( > OutputFileConfig.builder() > .with_part_prefix("prefix") > .with_part_suffix(".ext") > .build()) > .with_rolling_policy(RollingPolicy.default_rolling_policy()) > .build() > ) > else: > print("Printing result to stdout. Use --output to specify output > path.") > a = list(ds.execute_and_collect()) > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > parser = argparse.ArgumentParser() > parser.add_argument( > '--input', > dest='input', > required=False, > help='Input file to process.') > parser.add_argument( > '--output', > dest='output', > required=False, > help='Output file to write results to.') > argv = sys.argv[1:] > known_args, _ = parser.parse_known_args(argv) > word_count(known_args.input, known_args.output) > {code} > For the above job, the following exception will be thrown: > {code} > Traceback (most recent call last): > File > "/Users/dianfu/code/src/workspace/pyflink-examples/udf/test_udf_perf.py", > line 131, in <module> > word_count(known_args.input, known_args.output) > File > "/Users/dianfu/code/src/workspace/pyflink-examples/udf/test_udf_perf.py", > line 110, in word_count > a = list(ds.execute_and_collect()) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", > line 2920, in __next__ > return self.next() > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", > line 2931, in next > return convert_to_python_obj(self._j_closeable_iterator.next(), > self._type_info) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/utils.py", > line 72, in convert_to_python_obj > fields.append(pickled_bytes_to_python_converter(data, field_type)) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/utils.py", > line 91, in pickled_bytes_to_python_converter > data = pickle.loads(data) > TypeError: a bytes-like object is required, not 'JavaList' > {code} > See more details on > https://apache-flink.slack.com/archives/C03G7LJTS2G/p1678894062180649 -- This message was sent by Atlassian Jira (v8.20.10#820010)