Wouter Zorgdrager created FLINK-35290:
-----------------------------------------

             Summary: Wrong Instant type conversion TableAPI to Datastream in 
thread mode
                 Key: FLINK-35290
                 URL: https://issues.apache.org/jira/browse/FLINK-35290
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.18.1
            Reporter: Wouter Zorgdrager


In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a 
Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm 
wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` maps 
to a Python `datetime`. Can't the same be done for the DatastreamAPI? 
Nevertheless, if we switch from `process` to `thread` mode for execution, the 
`TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a 
`java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if I 
only use the DatastreamAPI  and read `Types.Instant()` directly, the conversion 
in both `thread` and `process` mode seem to work just fine.

Below a minimal example exposing the bug:

```
EXECUTION_MODE = "thread"  # or "process"
config = Configuration()
config.set_string("python.execution-mode", EXECUTION_MODE)

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("python.fn-execution.bundle.size", "1")
t_env.get_config().set("python.execution-mode", EXECUTION_MODE)


def to_epoch_ms(row: Row):
    print(type(row[1]))
    return row[1].to_epoch_milli()


t_env.to_data_stream(
    t_env.from_elements(
        [
            (1, datetime(year=2024, day=10, month=9, hour=9)),
            (2, datetime(year=2024, day=10, month=9, hour=12)),
            (3, datetime(year=2024, day=22, month=11, hour=12)),
        ],
        DataTypes.ROW(
            [
                DataTypes.FIELD("id", DataTypes.INT()),
                DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
            ]
        ),
    )
).map(to_epoch_ms, output_type=Types.LONG()).print()
env.execute()
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to