Hi Dian, I am able to reproduce this issue in a much simpler setup. Let me update with the simpler reproducible example shortly.
Best, Yik San On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <evan.chanyik...@gmail.com> wrote: > Hi Dian, > > It is a good catch, though after changing to use > flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same > error. > > Best, > Yik San > > On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <dian0511...@gmail.com> wrote: > >> >> I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does >> the jar files in the cluster nodes are also built with Scala 2.12? PyFlink >> package bundles jar files with Scala 2.11 by default. I’m still not sure if >> it’s related to this issue. However, I think this is problematic. Could you >> make sure that they are consistent? >> >> >> 2021年3月19日 上午10:40,Yik San Chan <evan.chanyik...@gmail.com> 写道: >> >> Hi Dian, >> >> The PyFlink version is 1.12.0 and the Flink version in the cluster nodes >> is also 1.12.0 >> >> $ which flink >> /data/apache/flink/flink-1.12.0/bin/flink >> >> Best, >> Yik San >> >> On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com> wrote: >> >>> Hi, >>> >>> What’s the Flink version in the cluster nodes? It should matches the >>> PyFlink version. >>> >>> Regards, >>> Dian >>> >>> 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com> 写道: >>> >>> This question is cross-posted on StackOverflow >>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint >>> >>> I have a PyFlink job that reads from Kafka source, transform, and write >>> to Kafka sink. This is a `tree` view of my working directory. >>> >>> ``` >>> > tree >>> . >>> ├── deps >>> │ └── flink-sql-connector-kafka_2.12-1.12.0.jar >>> ├── flink_run.sh >>> ├── main.py >>> ├── pyflink1.12.0.zip >>> └── tasks >>> └── user_last_n_clicks >>> ├── sink_ddl.sql >>> ├── source_ddl.sql >>> └── transform_dml.sql >>> ``` >>> >>> This is the `flink_run.sh`: >>> >>> ``` >>> flink run \ >>> --yarnname test-pyflink \ >>> -m yarn-cluster \ >>> -yD yarn.application.queue=tech_platform \ >>> -pyarch pyflink1.12.0.zip \ >>> -pyexec /data/software/pyflink1.12.0/bin/python \ >>> -py main.py testing user_last_n_clicks >>> ``` >>> >>> This is the `main.py`. The key logic is in: >>> - `parse_content` udf. >>> - load sql files from tasks subfolder, and execute_sql >>> >>> ```python >>> import os >>> from sys import argv >>> from pyflink.datastream import StreamExecutionEnvironment >>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, >>> DataTypes >>> from pyflink.table.udf import udf >>> >>> def read_file_content(filepath): >>> with open(filepath) as f: >>> return f.read() >>> >>> @udf(input_types=[DataTypes.STRING()], >>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) >>> def parse_content(content_str): >>> import json >>> res = {} >>> content = json.loads(content_str) >>> if 'postId' in content: >>> res['item_id'] = content['postId'] >>> if 'lid' in content: >>> res['item_id'] = content['lid'] >>> if 'param' in content and 'tag' in content['param']: >>> res['tag'] = content['param']['tag'] >>> return res >>> >>> CWD = os.getcwd() >>> _, palfish_env, task = argv >>> >>> VALID_PALFISH_ENVS = ['development', 'testing', 'production'] >>> if palfish_env not in VALID_PALFISH_ENVS: >>> raise Exception(f"{palfish_env} is not a valid env, should be one of >>> [{', '.join(VALID_PALFISH_ENVS)}].") >>> >>> VALID_TASKS = os.listdir(f"{CWD}/tasks") >>> if task not in VALID_TASKS: >>> raise Exception(f"{task} is not a valid task, should be one of [{', >>> '.join(VALID_TASKS)}].") >>> >>> config = { >>> "development": { >>> "${generation.kafka.source.servers}": "localhost:9094", >>> "${generation.kafka.sink.servers}": "localhost:9094" >>> }, >>> "testing": { >>> "${generation.kafka.source.servers}": "10.111.135.233:9092, >>> 10.111.130.11:9092,10.111.130.12:9092", >>> "${generation.kafka.sink.servers}": "10.111.135.233:9092, >>> 10.111.130.11:9092,10.111.130.12:9092" >>> }, >>> "production": { >>> "${generation.kafka.source.servers}": "10.111.203.9:9092, >>> 10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092, >>> 10.111.204.164:9092,10.111.204.165:9092", >>> "${generation.kafka.sink.servers}": "10.111.209.219:9092, >>> 10.111.209.220:9092,10.111.209.221:9092" >>> } >>> } >>> >>> FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar" >>> >>> source_ddl = >>> read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', >>> config[palfish_env]['${generation.kafka.source.servers}']) >>> sink_ddl = >>> read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', >>> config[palfish_env]['${generation.kafka.sink.servers}']) >>> transform_dml = >>> read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql') >>> >>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>> env_settings = EnvironmentSettings.Builder().use_blink_planner().build() >>> t_env = >>> StreamTableEnvironment.create(stream_execution_environment=exec_env, >>> environment_settings=env_settings) >>> >>> t_env.get_config().get_configuration().set_string("pipeline.jars", >>> f"file://{FAT_JAR_PATH}") >>> t_env.create_temporary_function("ParseContent", parse_content) >>> >>> t_env.execute_sql(source_ddl) >>> t_env.execute_sql(sink_ddl) >>> t_env.execute_sql(transform_dml).wait() >>> ``` >>> >>> See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`. >>> >>> ```sql >>> # source_ddl.sql >>> CREATE TABLE kafka_source ( >>> `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>> >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'data-report-stat-old-logtype7', >>> 'properties.bootstrap.servers' = '${generation.kafka.source.servers}', >>> 'properties.group.id' = 'flink-featurepipelines', >>> 'format' = 'json' >>> ) >>> >>> # transform_ddl.sql >>> INSERT INTO kafka_sink >>> WITH t1 AS ( >>> SELECT body['log']['uid'] user_id, >>> ParseContent(body['log']['contentstr']) content, body['log']['serverts'] >>> server_ts >>> FROM kafka_source >>> ), >>> t2 AS ( >>> SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts >>> FROM t1 >>> WHERE content['item_id'] IS NOT NULL >>> AND content['tag'] = '点击帖子卡片' >>> ), >>> last_n AS ( >>> SELECT user_id, item_id, server_ts >>> FROM ( >>> SELECT *, >>> ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as >>> row_num >>> FROM t2) >>> WHERE row_num <= 5 >>> ) >>> SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, >>> LISTAGG(CAST(item_id AS STRING)) last_5_clicks >>> FROM last_n >>> GROUP BY user_id >>> >>> # sink_ddl.sql >>> CREATE TABLE kafka_sink ( >>> user_id BIGINT, >>> datetime TIMESTAMP(3), >>> last_5_clicks STRING, >>> PRIMARY KEY (user_id) NOT ENFORCED >>> ) WITH ( >>> 'connector' = 'upsert-kafka', >>> 'topic' = 'aiinfra.fct.userfeature.0', >>> 'properties.bootstrap.servers' = '${generation.kafka.sink.servers}', >>> 'key.format' = 'json', >>> 'value.format' = 'json' >>> ) >>> ``` >>> >>> I got the error when running the PyFlink program in my testing >>> environment machine. >>> >>> ``` >>> Caused by: java.io.EOFException >>> at java.io.DataInputStream.readInt(DataInputStream.java:392) >>> ~[?:1.8.0_261] >>> at >>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) >>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) >>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) >>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> at >>> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) >>> ~[flink-python_2.11-1.12.0.jar:1.12.0] >>> ``` >>> >>> Here are the full logs, see >>> https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc. >>> >>> Any idea why the exception? Thanks. >>> >>> Yik San >>> >>> >>> >>