Hi Dian,

I am able to reproduce this issue in a much simpler setup. Let me update
with the simpler reproducible example shortly.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <evan.chanyik...@gmail.com>
wrote:

> Hi Dian,
>
> It is a good catch, though after changing to use
> flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same
> error.
>
> Best,
> Yik San
>
> On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <dian0511...@gmail.com> wrote:
>
>>
>> I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does
>> the jar files in the cluster nodes are also built with Scala 2.12? PyFlink
>> package bundles jar files with Scala 2.11 by default. I’m still not sure if
>> it’s related to this issue. However, I think this is problematic. Could you
>> make sure that they are consistent?
>>
>>
>> 2021年3月19日 上午10:40,Yik San Chan <evan.chanyik...@gmail.com> 写道:
>>
>> Hi Dian,
>>
>> The PyFlink version is 1.12.0 and the Flink version in the cluster nodes
>> is also 1.12.0
>>
>> $ which flink
>> /data/apache/flink/flink-1.12.0/bin/flink
>>
>> Best,
>> Yik San
>>
>> On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <dian0511...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> What’s the Flink version in the cluster nodes? It should matches the
>>> PyFlink version.
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年3月18日 下午5:01,Yik San Chan <evan.chanyik...@gmail.com> 写道:
>>>
>>> This question is cross-posted on StackOverflow
>>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint
>>>
>>> I have a PyFlink job that reads from Kafka source, transform, and write
>>> to Kafka sink. This is a `tree` view of my working directory.
>>>
>>> ```
>>> > tree
>>> .
>>> ├── deps
>>> │   └── flink-sql-connector-kafka_2.12-1.12.0.jar
>>> ├── flink_run.sh
>>> ├── main.py
>>> ├── pyflink1.12.0.zip
>>> └── tasks
>>>     └── user_last_n_clicks
>>>         ├── sink_ddl.sql
>>>         ├── source_ddl.sql
>>>         └── transform_dml.sql
>>> ```
>>>
>>> This is the `flink_run.sh`:
>>>
>>> ```
>>> flink run \
>>> --yarnname test-pyflink \
>>> -m yarn-cluster \
>>> -yD yarn.application.queue=tech_platform \
>>> -pyarch pyflink1.12.0.zip \
>>> -pyexec /data/software/pyflink1.12.0/bin/python \
>>> -py main.py testing user_last_n_clicks
>>> ```
>>>
>>> This is the `main.py`. The key logic is in:
>>> - `parse_content` udf.
>>> - load sql files from tasks subfolder, and execute_sql
>>>
>>> ```python
>>> import os
>>> from sys import argv
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
>>> DataTypes
>>> from pyflink.table.udf import udf
>>>
>>> def read_file_content(filepath):
>>>     with open(filepath) as f:
>>>         return f.read()
>>>
>>> @udf(input_types=[DataTypes.STRING()],
>>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
>>> def parse_content(content_str):
>>>     import json
>>>     res = {}
>>>     content = json.loads(content_str)
>>>     if 'postId' in content:
>>>         res['item_id'] = content['postId']
>>>     if 'lid' in content:
>>>         res['item_id'] = content['lid']
>>>     if 'param' in content and 'tag' in content['param']:
>>>         res['tag'] = content['param']['tag']
>>>     return res
>>>
>>> CWD = os.getcwd()
>>> _, palfish_env, task = argv
>>>
>>> VALID_PALFISH_ENVS = ['development', 'testing', 'production']
>>> if palfish_env not in VALID_PALFISH_ENVS:
>>>     raise Exception(f"{palfish_env} is not a valid env, should be one of
>>> [{', '.join(VALID_PALFISH_ENVS)}].")
>>>
>>> VALID_TASKS = os.listdir(f"{CWD}/tasks")
>>> if task not in VALID_TASKS:
>>>     raise Exception(f"{task} is not a valid task, should be one of [{',
>>> '.join(VALID_TASKS)}].")
>>>
>>> config = {
>>>     "development": {
>>>         "${generation.kafka.source.servers}": "localhost:9094",
>>>         "${generation.kafka.sink.servers}": "localhost:9094"
>>>     },
>>>     "testing": {
>>>         "${generation.kafka.source.servers}": "10.111.135.233:9092,
>>> 10.111.130.11:9092,10.111.130.12:9092",
>>>         "${generation.kafka.sink.servers}": "10.111.135.233:9092,
>>> 10.111.130.11:9092,10.111.130.12:9092"
>>>     },
>>>     "production": {
>>>         "${generation.kafka.source.servers}": "10.111.203.9:9092,
>>> 10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,
>>> 10.111.204.164:9092,10.111.204.165:9092",
>>>         "${generation.kafka.sink.servers}": "10.111.209.219:9092,
>>> 10.111.209.220:9092,10.111.209.221:9092"
>>>     }
>>> }
>>>
>>> FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"
>>>
>>> source_ddl =
>>> read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}',
>>> config[palfish_env]['${generation.kafka.source.servers}'])
>>> sink_ddl =
>>> read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}',
>>> config[palfish_env]['${generation.kafka.sink.servers}'])
>>> transform_dml =
>>> read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')
>>>
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
>>> t_env =
>>> StreamTableEnvironment.create(stream_execution_environment=exec_env,
>>> environment_settings=env_settings)
>>>
>>> t_env.get_config().get_configuration().set_string("pipeline.jars",
>>> f"file://{FAT_JAR_PATH}")
>>> t_env.create_temporary_function("ParseContent", parse_content)
>>>
>>> t_env.execute_sql(source_ddl)
>>> t_env.execute_sql(sink_ddl)
>>> t_env.execute_sql(transform_dml).wait()
>>> ```
>>>
>>> See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.
>>>
>>> ```sql
>>> # source_ddl.sql
>>> CREATE TABLE kafka_source (
>>> `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'data-report-stat-old-logtype7',
>>> 'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
>>> 'properties.group.id' = 'flink-featurepipelines',
>>> 'format' = 'json'
>>> )
>>>
>>> # transform_ddl.sql
>>> INSERT INTO kafka_sink
>>> WITH t1 AS (
>>> SELECT body['log']['uid'] user_id,
>>> ParseContent(body['log']['contentstr']) content, body['log']['serverts']
>>> server_ts
>>> FROM kafka_source
>>> ),
>>> t2 AS (
>>> SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
>>> FROM t1
>>> WHERE content['item_id'] IS NOT NULL
>>> AND content['tag'] = '点击帖子卡片'
>>> ),
>>> last_n AS (
>>> SELECT user_id, item_id, server_ts
>>> FROM (
>>> SELECT *,
>>> ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as
>>> row_num
>>> FROM t2)
>>> WHERE row_num <= 5
>>> )
>>> SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime,
>>> LISTAGG(CAST(item_id AS STRING)) last_5_clicks
>>> FROM last_n
>>> GROUP BY user_id
>>>
>>> # sink_ddl.sql
>>> CREATE TABLE kafka_sink (
>>>     user_id BIGINT,
>>>     datetime TIMESTAMP(3),
>>>     last_5_clicks STRING,
>>>     PRIMARY KEY (user_id) NOT ENFORCED
>>> ) WITH (
>>>     'connector' = 'upsert-kafka',
>>>     'topic' = 'aiinfra.fct.userfeature.0',
>>>     'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
>>>     'key.format' = 'json',
>>>     'value.format' = 'json'
>>> )
>>> ```
>>>
>>> I got the error when running the PyFlink program in my testing
>>> environment machine.
>>>
>>> ```
>>> Caused by: java.io.EOFException
>>>     at java.io.DataInputStream.readInt(DataInputStream.java:392)
>>> ~[?:1.8.0_261]
>>>     at
>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91)
>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>>     at
>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87)
>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>>     at
>>> org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36)
>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>>     at
>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124)
>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>     at
>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107)
>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>     at
>>> org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>     at
>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104)
>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>     at
>>> org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>>     at
>>> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84)
>>> ~[flink-python_2.11-1.12.0.jar:1.12.0]
>>> ```
>>>
>>> Here are the full logs, see
>>> https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.
>>>
>>> Any idea why the exception? Thanks.
>>>
>>> Yik San
>>>
>>>
>>>
>>

Reply via email to