This question is cross-posted on StackOverflow
https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to
Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()],
result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of
[{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{',
'.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,
10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,
10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,
10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,
10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,
10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl =
read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}',
config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl =
read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}',
config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env =
StreamTableEnvironment.create(stream_execution_environment=exec_env,
environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars",
f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr'])
content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime,
LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment
machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
~[?:1.8.0_261]
    at
org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91)
~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at
org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87)
~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at
org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36)
~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at
org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124)
~[flink-python_2.11-1.12.0.jar:1.12.0]
    at
org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107)
~[flink-python_2.11-1.12.0.jar:1.12.0]
    at
org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
~[flink-python_2.11-1.12.0.jar:1.12.0]
    at
org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104)
~[flink-python_2.11-1.12.0.jar:1.12.0]
    at
org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
~[flink-python_2.11-1.12.0.jar:1.12.0]
    at
org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84)
~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see
https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San

Reply via email to