Hi,看起来像是sink收到了endInput的信号之后,又有数据喂给他、导致报错。
你用的是什么版本的flink?我不太确定是不是和这个issue有关系https://issues.apache.org/jira/browse/FLINK-37605,你可以用jira里贴的fixed
 version试试



--

    Best!
    Xuyang



在 2025-11-12 09:20:59,"徐平" <[email protected]> 写道:
>大佬们好,有没有人遇到下述问题,感谢回复
>
>任务sql:
>
>CREATE TEMPORARY TABLE `dw_realtime_inner_ods_sensor_events` (
>  `_track_id` VARCHAR,
>  `time` VARCHAR,
>  `type` VARCHAR,
>  `distinct_id` VARCHAR,
>  `anonymous_id` VARCHAR,
>  `identities` VARCHAR,
>  `event` VARCHAR,
>  `lib` VARCHAR,
>  `properties` VARCHAR,
>  `_flush_time` VARCHAR,
>  `dtk` VARCHAR,
>  `map_id` VARCHAR,
>  `user_id` VARCHAR,
>  `recv_time` VARCHAR,
>  `extractor` VARCHAR,
>  `project_id` VARCHAR,
>  `project` VARCHAR,
>  `ts` AS `TO_TIMESTAMP_LTZ`(CAST(`time` AS BIGINT), 3),
>  `ver` VARCHAR,
>  WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND
>) WITH (
>  'connector' = 'kafka',
>  'properties.bootstrap.servers' = '--',
>  'topic' = 'event_topic',
>  'value.format' = 'json',
>  'properties.group.id' = 'dwd_sensor_deal_loginresult_dtl',
>  'scan.startup.mode' = 'latest-offset'
>);
>CREATE TEMPORARY TABLE `dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000` 
>(
>
>  `event_time` VARCHAR,
>  `sensor_user_id` VARCHAR,
>  `lucy_user_id` VARCHAR,
>  `event` VARCHAR,
>  `is_success` VARCHAR,
>  `account_id` VARCHAR,
>  `fail_reason` VARCHAR,
>  `initpasswd_flag` VARCHAR,
>  `login_type` VARCHAR,
>  `ip_site` VARCHAR,
>  `relay_channel` VARCHAR,
>  `counter_channel` VARCHAR,
>  `lucy_deviceid` VARCHAR,
>  `cid` VARCHAR,
>  `org_user_company` VARCHAR,
>  `org_user_ID` VARCHAR,
>  `phone_number` VARCHAR,
>  `org_user_loginID` VARCHAR,
>  `org_user_type` VARCHAR,
>  `org_user_level` VARCHAR,
>  `os` VARCHAR,
>  `os_version` VARCHAR,
>  `lib` VARCHAR,
>  `lib_version` VARCHAR,
>  `manufacturer` VARCHAR,
>  `model` VARCHAR,
>  `brand` VARCHAR,
>  `app_version` VARCHAR,
>  `app_id` VARCHAR,
>  `app_name` VARCHAR,
>  `platform_type` VARCHAR,
>  `sensor_device_id` VARCHAR,
>  `ip` VARCHAR,
>  `track_signup_original_id` VARCHAR,
>  `is_login_id` VARCHAR,
>  `city` VARCHAR,
>  `province` VARCHAR,
>  `country` VARCHAR
>) WITH (
>  'connector' = 'kafka',
>  'properties.sasl.jaas.config'='****',
>  'properties.sasl.mechanism' = 'PLAIN',
> 'properties.bootstrap.servers' = ‘--',
>  'properties.security.protocol' = 'SASL_PLAINTEXT',
>  'topic' = 'dwd-sensor-deal-loginresult-dtl',
>  'value.format' = 'json'
>);
>INSERT INTO `dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000`
>(SELECT `time` AS `event_time`
>      , `user_id` AS `sensor_user_id`
>      , `distinct_id` AS `lucy_user_id`
>      , `event` AS `event`
>      , JSON_VALUE(`properties`, 'is_success') AS `is_success`
>      , JSON_VALUE(`properties`, 'deal_account') AS `account_id`
>      , JSON_VALUE(`properties`, 'fail_reason') AS `fail_reason`
>      , JSON_VALUE(`properties`, 'initpasswd_flag') AS `initpasswd_flag`
>      , JSON_VALUE(`properties`, 'login_type') AS `login_type`
>      , JSON_VALUE(`properties`, 'IP_site') AS `ip_site`
>      , JSON_VALUE(`properties`, 'relay_channel') AS `relay_channel`
>      , JSON_VALUE(`properties`, 'counter_channel') AS `counter_channel`
>      , JSON_VALUE(`properties`, 'lucy_deviceid') AS `lucy_deviceid`
>      , JSON_VALUE(`properties`, 'app_cid') AS `cid`
>      , JSON_VALUE(`properties`, 'org_user_company') AS `org_user_company`
>      , JSON_VALUE(`properties`, 'org_user_ID') AS `org_user_ID`
>      , JSON_VALUE(`properties`, 'phone_number') AS `phone_number`
>      , JSON_VALUE(`properties`, 'org_user_loginID') AS `org_user_loginID`
>      , JSON_VALUE(`properties`, 'org_user_type') AS `org_user_type`
>      , JSON_VALUE(`properties`, 'org_user_level') AS `org_user_level`
>      , JSON_VALUE(`properties`, '$os') AS `os`
>      , JSON_VALUE(`properties`, '$os_version') AS `os_version`
>      , JSON_VALUE(`properties`, '$lib') AS `lib`
>      , JSON_VALUE(`properties`, '$lib_version') AS `lib_version`
>      , JSON_VALUE(`properties`, '$manufacturer') AS `manufacturer`
>      , JSON_VALUE(`properties`, '$model') AS `model`
>      , JSON_VALUE(`properties`, '$brand') AS `brand`
>      , JSON_VALUE(`properties`, '$app_version') AS `app_version`
>      , JSON_VALUE(`properties`, '$app_id') AS `app_id`
>      , JSON_VALUE(`properties`, 'app_name') AS `app_name`
>      , JSON_VALUE(`properties`, 'platform_type') AS `platform_type`
>      , JSON_VALUE(`properties`, '$device_id') AS `sensor_device_id`
>      , JSON_VALUE(`properties`, '$ip') AS `ip`
>      , JSON_VALUE(`properties`, '$track_signup_original_id') AS 
> `track_signup_original_id`
>      , JSON_VALUE(`properties`, '$is_login_id') AS `is_login_id`
>      , JSON_VALUE(`properties`, '$city') AS `city`
>      , JSON_VALUE(`properties`, '$province') AS `province`
>      , JSON_VALUE(`properties`, '$country') AS `country`
>    FROM `dw_realtime_inner_ods_sensor_events`
>    WHERE `event` = 'deal_loginresult');
>
>
>2025-11-12 05:01:59.246 [Flink-Metric-Reporter-thread-1] INFO  
>c.alibaba.dataphin.flink.metric.reporter.adapter.HttpSender  - send message to 
>topic:dataphin-dataphin:DATAPHIN_NIGHTSWATCH_V4
>2025-11-12 05:02:48.387 [flink-pekko.actor.default-dispatcher-13] INFO  
>org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Activate slot 
>e0a66effa7e07c1b33dfcce116235362.
>2025-11-12 05:02:48.389 [flink-pekko.actor.default-dispatcher-13] INFO  
>org.apache.flink.runtime.taskexecutor.TaskExecutor  - Received task Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546), 
>deploy into slot with allocation id e0a66effa7e07c1b33dfcce116235362.
>2025-11-12 05:02:48.389 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.runtime.taskmanager.Task  - Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) 
>switched from CREATED to DEPLOYING.
>2025-11-12 05:02:48.389 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task 
>Source: dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) 
>[DEPLOYING].
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.streaming.runtime.tasks.StreamTask  - State backend is 
>set to heap memory 
>org.apache.flink.runtime.state.hashmap.HashMapStateBackend@70f9dfb9
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.runtime.state.StateBackendLoader  - State backend 
>loader loads the state backend as HashMapStateBackend
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>WARN  org.apache.flink.configuration.Configuration  - Config uses deprecated 
>configuration key 'state.checkpoint-storage' instead of proper key 
>'execution.checkpointing.storage'
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>WARN  org.apache.flink.configuration.Configuration  - Config uses deprecated 
>configuration key 'state.checkpoints.dir' instead of proper key 
>'execution.checkpointing.dir'
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.streaming.runtime.tasks.StreamTask  - Checkpoint 
>storage is set to 'filesystem': (checkpoints 
>"hdfs://ns1/checkpoint/dw_flinkinner")
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.runtime.taskmanager.Task  - Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) 
>switched from DEPLOYING to INITIALIZING.
>2025-11-12 05:02:48.395 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.runtime.state.StateBackendLoader  - State backend is 
>set to heap memory 
>org.apache.flink.runtime.state.hashmap.HashMapStateBackend@37d6bef9
>2025-11-12 05:02:48.395 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>WARN  org.apache.flink.metrics.MetricGroup  - Name collision: Group already 
>contains a Metric with the name 'pendingCommittables'. Metric will not be 
>reported.[dtstack-prod-07, taskmanager, 
>container_e17_1762828451362_2318_01_000002, 
>300007101_dw_realtime_inner_dwd_sensor_deal_loginresult_dtl, 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer, 0]
>2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.runtime.state.StateBackendLoader  - State backend is 
>set to heap memory 
>org.apache.flink.runtime.state.hashmap.HashMapStateBackend@79eb2a14
>2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.producer.ProducerConfig  - 
>ProducerConfig values: 
>    acks = -1
>    auto.include.jmx.reporter = true
>    batch.size = 16384
>    bootstrap.servers = []
>    buffer.memory = 33554432
>    client.dns.lookup = use_all_dns_ips
>    client.id = producer-547
>    compression.type = none
>    connections.max.idle.ms = 540000
>    delivery.timeout.ms = 120000
>    enable.idempotence = true
>    interceptor.classes = []
>    key.serializer = class 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer
>    linger.ms = 0
>    max.block.ms = 60000
>    max.in.flight.requests.per.connection = 5
>    max.request.size = 1048576
>    metadata.max.age.ms = 300000
>    metadata.max.idle.ms = 300000
>    metric.reporters = []
>    metrics.num.samples = 2
>    metrics.recording.level = INFO
>    metrics.sample.window.ms = 30000
>    partitioner.adaptive.partitioning.enable = true
>    partitioner.availability.timeout.ms = 0
>    partitioner.class = null
>    partitioner.ignore.keys = false
>    receive.buffer.bytes = 32768
>    reconnect.backoff.max.ms = 1000
>    reconnect.backoff.ms = 50
>    request.timeout.ms = 30000
>    retries = 2147483647
>    retry.backoff.ms = 100
>    sasl.client.callback.handler.class = null
>    sasl.jaas.config = [hidden]
>    sasl.kerberos.kinit.cmd = /usr/bin/kinit
>    sasl.kerberos.min.time.before.relogin = 60000
>    sasl.kerberos.service.name = null
>    sasl.kerberos.ticket.renew.jitter = 0.05
>    sasl.kerberos.ticket.renew.window.factor = 0.8
>    sasl.login.callback.handler.class = null
>    sasl.login.class = null
>    sasl.login.connect.timeout.ms = null
>    sasl.login.read.timeout.ms = null
>    sasl.login.refresh.buffer.seconds = 300
>    sasl.login.refresh.min.period.seconds = 60
>    sasl.login.refresh.window.factor = 0.8
>    sasl.login.refresh.window.jitter = 0.05
>    sasl.login.retry.backoff.max.ms = 10000
>    sasl.login.retry.backoff.ms = 100
>    sasl.mechanism = PLAIN
>    sasl.oauthbearer.clock.skew.seconds = 30
>    sasl.oauthbearer.expected.audience = null
>    sasl.oauthbearer.expected.issuer = null
>    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
>    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
>    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>    sasl.oauthbearer.jwks.endpoint.url = null
>    sasl.oauthbearer.scope.claim.name = scope
>    sasl.oauthbearer.sub.claim.name = sub
>    sasl.oauthbearer.token.endpoint.url = null
>    security.protocol = SASL_PLAINTEXT
>    security.providers = null
>    send.buffer.bytes = 131072
>    socket.connection.setup.timeout.max.ms = 30000
>    socket.connection.setup.timeout.ms = 10000
>    ssl.cipher.suites = null
>    ssl.enabled.protocols = [TLSv1.2]
>    ssl.endpoint.identification.algorithm = https
>    ssl.engine.factory.class = null
>    ssl.key.password = null
>    ssl.keymanager.algorithm = SunX509
>    ssl.keystore.certificate.chain = null
>    ssl.keystore.key = null
>    ssl.keystore.location = null
>    ssl.keystore.password = null
>    ssl.keystore.type = JKS
>    ssl.protocol = TLSv1.2
>    ssl.provider = null
>    ssl.secure.random.implementation = null
>    ssl.trustmanager.algorithm = PKIX
>    ssl.truststore.certificates = null
>    ssl.truststore.location = null
>    ssl.truststore.password = null
>    ssl.truststore.type = JKS
>    transaction.timeout.ms = 3600000
>    transactional.id = null
>    value.serializer = class 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer
>
>2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.producer.KafkaProducer  - [Producer 
>clientId=producer-547] Instantiated an idempotent producer.
>2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.o.a.k.common.security.authenticator.AbstractLogin  - 
>Successfully logged in.
>2025-11-12 05:02:48.397 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
>version: unknown
>2025-11-12 05:02:48.397 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
>commitId: unknown
>2025-11-12 05:02:48.397 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
>startTimeMs: 1762894968397
>2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.runtime.state.StateBackendLoader  - State backend is 
>set to heap memory 
>org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3f137397
>2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.runtime.state.StateBackendLoader  - State backend is 
>set to heap memory 
>org.apache.flink.runtime.state.hashmap.HashMapStateBackend@1d95cb68
>2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.flink.streaming.api.operators.AbstractStreamOperator  - Restoring 
>state for 10 split(s) to reader.
>2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding 
>split(s) to reader: [[Partition: event_topic-9, StartingOffset: 2430596361, 
>StoppingOffset: -9223372036854775808], [Partition: event_topic-8, 
>StartingOffset: 2430382382, StoppingOffset: -9223372036854775808], [Partition: 
>event_topic-7, StartingOffset: 2431234238, StoppingOffset: 
>-9223372036854775808], [Partition: event_topic-6, StartingOffset: 2435053051, 
>StoppingOffset: -9223372036854775808], [Partition: event_topic-5, 
>StartingOffset: 2453358876, StoppingOffset: -9223372036854775808], [Partition: 
>event_topic-4, StartingOffset: 2409060719, StoppingOffset: 
>-9223372036854775808], [Partition: event_topic-3, StartingOffset: 2506740242, 
>StoppingOffset: -9223372036854775808], [Partition: event_topic-2, 
>StartingOffset: 2446799648, StoppingOffset: -9223372036854775808], [Partition: 
>event_topic-1, StartingOffset: 2399380974, StoppingOffset: 
>-9223372036854775808], [Partition: event_topic-0, StartingOffset: 2451516068, 
>StoppingOffset: -9223372036854775808]]
>2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.ConsumerConfig  - 
>ConsumerConfig values: 
>    allow.auto.create.topics = true
>    auto.commit.interval.ms = 5000
>    auto.include.jmx.reporter = true
>    auto.offset.reset = latest
>    bootstrap.servers = [hybrid01.gjqh-sc.sensorsdata.cloud:9092, 
> hybrid02.gjqh-sc.sensorsdata.cloud:9092, 
> hybrid03.gjqh-sc.sensorsdata.cloud:9092]
>    check.crcs = true
>    client.dns.lookup = use_all_dns_ips
>    client.id = dwd_sensor_deal_loginresult_dtl-0
>    client.rack = 
>    connections.max.idle.ms = 540000
>    default.api.timeout.ms = 60000
>    enable.auto.commit = false
>    exclude.internal.topics = true
>    fetch.max.bytes = 52428800
>    fetch.max.wait.ms = 500
>    fetch.min.bytes = 1
>    group.id = dwd_sensor_deal_loginresult_dtl
>    group.instance.id = null
>    heartbeat.interval.ms = 3000
>    interceptor.classes = []
>    internal.leave.group.on.close = true
>    internal.throw.on.fetch.stable.offset.unsupported = false
>    isolation.level = read_uncommitted
>    key.deserializer = class 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>    max.partition.fetch.bytes = 1048576
>    max.poll.interval.ms = 300000
>    max.poll.records = 500
>    metadata.max.age.ms = 300000
>    metric.reporters = []
>    metrics.num.samples = 2
>    metrics.recording.level = INFO
>    metrics.sample.window.ms = 30000
>    partition.assignment.strategy = [class 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RangeAssignor,
>  class 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
>    receive.buffer.bytes = 65536
>    reconnect.backoff.max.ms = 1000
>    reconnect.backoff.ms = 50
>    request.timeout.ms = 30000
>    retry.backoff.ms = 100
>    sasl.client.callback.handler.class = null
>    sasl.jaas.config = null
>    sasl.kerberos.kinit.cmd = /usr/bin/kinit
>    sasl.kerberos.min.time.before.relogin = 60000
>    sasl.kerberos.service.name = null
>    sasl.kerberos.ticket.renew.jitter = 0.05
>    sasl.kerberos.ticket.renew.window.factor = 0.8
>    sasl.login.callback.handler.class = null
>    sasl.login.class = null
>    sasl.login.connect.timeout.ms = null
>    sasl.login.read.timeout.ms = null
>    sasl.login.refresh.buffer.seconds = 300
>    sasl.login.refresh.min.period.seconds = 60
>    sasl.login.refresh.window.factor = 0.8
>    sasl.login.refresh.window.jitter = 0.05
>    sasl.login.retry.backoff.max.ms = 10000
>    sasl.login.retry.backoff.ms = 100
>    sasl.mechanism = GSSAPI
>    sasl.oauthbearer.clock.skew.seconds = 30
>    sasl.oauthbearer.expected.audience = null
>    sasl.oauthbearer.expected.issuer = null
>    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
>    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
>    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>    sasl.oauthbearer.jwks.endpoint.url = null
>    sasl.oauthbearer.scope.claim.name = scope
>    sasl.oauthbearer.sub.claim.name = sub
>    sasl.oauthbearer.token.endpoint.url = null
>    security.protocol = PLAINTEXT
>    security.providers = null
>    send.buffer.bytes = 131072
>    session.timeout.ms = 45000
>    socket.connection.setup.timeout.max.ms = 30000
>    socket.connection.setup.timeout.ms = 10000
>    ssl.cipher.suites = null
>    ssl.enabled.protocols = [TLSv1.2]
>    ssl.endpoint.identification.algorithm = https
>    ssl.engine.factory.class = null
>    ssl.key.password = null
>    ssl.keymanager.algorithm = SunX509
>    ssl.keystore.certificate.chain = null
>    ssl.keystore.key = null
>    ssl.keystore.location = null
>    ssl.keystore.password = null
>    ssl.keystore.type = JKS
>    ssl.protocol = TLSv1.2
>    ssl.provider = null
>    ssl.secure.random.implementation = null
>    ssl.trustmanager.algorithm = PKIX
>    ssl.truststore.certificates = null
>    ssl.truststore.location = null
>    ssl.truststore.password = null
>    ssl.truststore.type = JKS
>    value.deserializer = class 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>
>2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>WARN  o.a.f.k.s.org.apache.kafka.clients.consumer.ConsumerConfig  - These 
>configurations '[client.id.prefix, partition.discovery.interval.ms]' were 
>supplied but are not used yet.
>2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
>version: unknown
>2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
>commitId: unknown
>2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
>startTimeMs: 1762894968401
>2025-11-12 05:02:48.401 [kafka-producer-network-thread | producer-547] INFO  
>o.a.flink.kafka.shaded.org.apache.kafka.clients.Metadata  - [Producer 
>clientId=producer-547] Cluster ID: o3_RsYEWTqqofZkOgluOzQ
>2025-11-12 05:02:48.402 [kafka-producer-network-thread | producer-547] INFO  
>o.a.f.k.s.o.a.k.c.producer.internals.TransactionManager  - [Producer 
>clientId=producer-547] ProducerId set to 35323 with epoch 0
>2025-11-12 05:02:48.402 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Starting 
>split fetcher 0
>2025-11-12 05:02:48.402 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  org.apache.flink.runtime.taskmanager.Task  - Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) 
>switched from INITIALIZING to RUNNING.
>2025-11-12 05:02:48.403 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Assigned to partition(s): 
>event_topic-9, event_topic-8, event_topic-7, event_topic-6, event_topic-5, 
>event_topic-4, event_topic-3, event_topic-2, event_topic-1, event_topic-0
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2451516068 for 
>partition event_topic-0
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2399380974 for 
>partition event_topic-1
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2446799648 for 
>partition event_topic-2
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2506740242 for 
>partition event_topic-3
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2430382382 for 
>partition event_topic-8
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2430596361 for 
>partition event_topic-9
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2409060719 for 
>partition event_topic-4
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2453358876 for 
>partition event_topic-5
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2435053051 for 
>partition event_topic-6
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2431234238 for 
>partition event_topic-7
>2025-11-12 05:02:48.407 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.flink.kafka.shaded.org.apache.kafka.clients.Metadata  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Cluster ID: gR5Vztt6TBCRvexaAjzrCw
>2025-11-12 05:02:48.616 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.org.apache.kafka.clients.producer.KafkaProducer  - [Producer 
>clientId=producer-547] Closing the Kafka producer with timeoutMillis = 3600000 
>ms.
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Metrics 
>scheduler closed
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Closing 
>reporter 
>org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.JmxReporter
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Metrics 
>reporters closed
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - App info 
>kafka.producer for producer-547 unregistered
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing 
>Source Reader.
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting 
>down split fetcher 0
>2025-11-12 05:02:48.617 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.o.a.k.c.consumer.internals.ConsumerCoordinator  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Resetting generation and member id 
>due to: consumer pro-actively leaving the group
>2025-11-12 05:02:48.617 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.s.o.a.k.c.consumer.internals.ConsumerCoordinator  - [Consumer 
>clientId=dwd_sensor_deal_loginresult_dtl-0, 
>groupId=dwd_sensor_deal_loginresult_dtl] Request joining group due to: 
>consumer pro-actively leaving the group
>2025-11-12 05:02:48.617 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Metrics 
>scheduler closed
>2025-11-12 05:02:48.617 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Closing 
>reporter 
>org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.JmxReporter
>2025-11-12 05:02:48.617 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Metrics 
>reporters closed
>2025-11-12 05:02:48.618 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - App info 
>kafka.consumer for dwd_sensor_deal_loginresult_dtl-0 unregistered
>2025-11-12 05:02:48.618 [Source Data Fetcher for Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Split 
>fetcher 0 exited.
>2025-11-12 05:02:48.618 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
>WARN  org.apache.flink.runtime.taskmanager.Task  - Source: 
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) 
>switched from RUNNING to FAILED with failure cause:
>java.io.IOException: Failed to deserialize consumer record due to
>    at 
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
>    at 
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
>    at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
>    at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
>    at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>    at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
>    at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
>    at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>    at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
>    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>    at java.lang.Thread.run(Thread.java:748)
>Caused by: java.io.IOException: Failed to deserialize consumer record 
>ConsumerRecord(topic = event_topic, partition = 1, leaderEpoch = 7, offset = 
>2399381243, CreateTime = 1762853256453, serialized key size = 37, serialized 
>value size = 2917, headers = RecordHeaders(headers = [], isReadOnly = false), 
>key = [B@54e6a925, value = [B@16f80c51).
>    at 
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:59)
>    at 
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
>    ... 14 common frames omitted
>Caused by: 
>org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
>Could not forward element to next operator
>    at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
>    at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
>    at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>    at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
>    at 
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
>    at 
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
>    at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
>    at 
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:115)
>    at 
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
>    ... 15 common frames omitted
>Caused by: 
>org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
>Could not forward element to next operator
>    at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
>    at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
>    at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>    at StreamExecCalc$150.processElement_0_0_rewriteGroup82(Unknown Source)
>    at StreamExecCalc$150.processElement(Unknown Source)
>    at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
>    ... 23 common frames omitted
>Caused by: java.lang.IllegalStateException: Received element after endOfInput: 
>Record @ (undef) : 
>+I(1762853228541,3333053382491751361,6994131829464607053,deal_loginresult,true,85427403,,false,fido,101.230.113.157:7001:8940:7003:8941:????????3,HUNDSUN,CTP,b7afd566c360b4104d44d46bc4cc972e,cf1b93e8c889f5acaa5956a01a1edf30,,,18826001457,,,,null,null,null,null,null,null,null,null,null,??????,app,null,null,null,null,null,null,null)
>    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
>    at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:203)
>    at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
>    ... 28 common frames omitted
>

回复