大佬们好,有没有人遇到下述问题,感谢回复

任务sql:

CREATE TEMPORARY TABLE `dw_realtime_inner_ods_sensor_events` (
  `_track_id` VARCHAR,
  `time` VARCHAR,
  `type` VARCHAR,
  `distinct_id` VARCHAR,
  `anonymous_id` VARCHAR,
  `identities` VARCHAR,
  `event` VARCHAR,
  `lib` VARCHAR,
  `properties` VARCHAR,
  `_flush_time` VARCHAR,
  `dtk` VARCHAR,
  `map_id` VARCHAR,
  `user_id` VARCHAR,
  `recv_time` VARCHAR,
  `extractor` VARCHAR,
  `project_id` VARCHAR,
  `project` VARCHAR,
  `ts` AS `TO_TIMESTAMP_LTZ`(CAST(`time` AS BIGINT), 3),
  `ver` VARCHAR,
  WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '--',
  'topic' = 'event_topic',
  'value.format' = 'json',
  'properties.group.id' = 'dwd_sensor_deal_loginresult_dtl',
  'scan.startup.mode' = 'latest-offset'
);
CREATE TEMPORARY TABLE `dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000` (

  `event_time` VARCHAR,
  `sensor_user_id` VARCHAR,
  `lucy_user_id` VARCHAR,
  `event` VARCHAR,
  `is_success` VARCHAR,
  `account_id` VARCHAR,
  `fail_reason` VARCHAR,
  `initpasswd_flag` VARCHAR,
  `login_type` VARCHAR,
  `ip_site` VARCHAR,
  `relay_channel` VARCHAR,
  `counter_channel` VARCHAR,
  `lucy_deviceid` VARCHAR,
  `cid` VARCHAR,
  `org_user_company` VARCHAR,
  `org_user_ID` VARCHAR,
  `phone_number` VARCHAR,
  `org_user_loginID` VARCHAR,
  `org_user_type` VARCHAR,
  `org_user_level` VARCHAR,
  `os` VARCHAR,
  `os_version` VARCHAR,
  `lib` VARCHAR,
  `lib_version` VARCHAR,
  `manufacturer` VARCHAR,
  `model` VARCHAR,
  `brand` VARCHAR,
  `app_version` VARCHAR,
  `app_id` VARCHAR,
  `app_name` VARCHAR,
  `platform_type` VARCHAR,
  `sensor_device_id` VARCHAR,
  `ip` VARCHAR,
  `track_signup_original_id` VARCHAR,
  `is_login_id` VARCHAR,
  `city` VARCHAR,
  `province` VARCHAR,
  `country` VARCHAR
) WITH (
  'connector' = 'kafka',
  'properties.sasl.jaas.config'='****',
  'properties.sasl.mechanism' = 'PLAIN',
 'properties.bootstrap.servers' = ‘--',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'topic' = 'dwd-sensor-deal-loginresult-dtl',
  'value.format' = 'json'
);
INSERT INTO `dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000`
(SELECT `time` AS `event_time`
      , `user_id` AS `sensor_user_id`
      , `distinct_id` AS `lucy_user_id`
      , `event` AS `event`
      , JSON_VALUE(`properties`, 'is_success') AS `is_success`
      , JSON_VALUE(`properties`, 'deal_account') AS `account_id`
      , JSON_VALUE(`properties`, 'fail_reason') AS `fail_reason`
      , JSON_VALUE(`properties`, 'initpasswd_flag') AS `initpasswd_flag`
      , JSON_VALUE(`properties`, 'login_type') AS `login_type`
      , JSON_VALUE(`properties`, 'IP_site') AS `ip_site`
      , JSON_VALUE(`properties`, 'relay_channel') AS `relay_channel`
      , JSON_VALUE(`properties`, 'counter_channel') AS `counter_channel`
      , JSON_VALUE(`properties`, 'lucy_deviceid') AS `lucy_deviceid`
      , JSON_VALUE(`properties`, 'app_cid') AS `cid`
      , JSON_VALUE(`properties`, 'org_user_company') AS `org_user_company`
      , JSON_VALUE(`properties`, 'org_user_ID') AS `org_user_ID`
      , JSON_VALUE(`properties`, 'phone_number') AS `phone_number`
      , JSON_VALUE(`properties`, 'org_user_loginID') AS `org_user_loginID`
      , JSON_VALUE(`properties`, 'org_user_type') AS `org_user_type`
      , JSON_VALUE(`properties`, 'org_user_level') AS `org_user_level`
      , JSON_VALUE(`properties`, '$os') AS `os`
      , JSON_VALUE(`properties`, '$os_version') AS `os_version`
      , JSON_VALUE(`properties`, '$lib') AS `lib`
      , JSON_VALUE(`properties`, '$lib_version') AS `lib_version`
      , JSON_VALUE(`properties`, '$manufacturer') AS `manufacturer`
      , JSON_VALUE(`properties`, '$model') AS `model`
      , JSON_VALUE(`properties`, '$brand') AS `brand`
      , JSON_VALUE(`properties`, '$app_version') AS `app_version`
      , JSON_VALUE(`properties`, '$app_id') AS `app_id`
      , JSON_VALUE(`properties`, 'app_name') AS `app_name`
      , JSON_VALUE(`properties`, 'platform_type') AS `platform_type`
      , JSON_VALUE(`properties`, '$device_id') AS `sensor_device_id`
      , JSON_VALUE(`properties`, '$ip') AS `ip`
      , JSON_VALUE(`properties`, '$track_signup_original_id') AS 
`track_signup_original_id`
      , JSON_VALUE(`properties`, '$is_login_id') AS `is_login_id`
      , JSON_VALUE(`properties`, '$city') AS `city`
      , JSON_VALUE(`properties`, '$province') AS `province`
      , JSON_VALUE(`properties`, '$country') AS `country`
    FROM `dw_realtime_inner_ods_sensor_events`
    WHERE `event` = 'deal_loginresult');


2025-11-12 05:01:59.246 [Flink-Metric-Reporter-thread-1] INFO  
c.alibaba.dataphin.flink.metric.reporter.adapter.HttpSender  - send message to 
topic:dataphin-dataphin:DATAPHIN_NIGHTSWATCH_V4
2025-11-12 05:02:48.387 [flink-pekko.actor.default-dispatcher-13] INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Activate slot 
e0a66effa7e07c1b33dfcce116235362.
2025-11-12 05:02:48.389 [flink-pekko.actor.default-dispatcher-13] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor  - Received task Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546), 
deploy into slot with allocation id e0a66effa7e07c1b33dfcce116235362.
2025-11-12 05:02:48.389 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.runtime.taskmanager.Task  - Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) 
switched from CREATED to DEPLOYING.
2025-11-12 05:02:48.389 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task 
Source: dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) 
[DEPLOYING].
2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.streaming.runtime.tasks.StreamTask  - State backend is 
set to heap memory 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@70f9dfb9
2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.runtime.state.StateBackendLoader  - State backend loader 
loads the state backend as HashMapStateBackend
2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
WARN  org.apache.flink.configuration.Configuration  - Config uses deprecated 
configuration key 'state.checkpoint-storage' instead of proper key 
'execution.checkpointing.storage'
2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
WARN  org.apache.flink.configuration.Configuration  - Config uses deprecated 
configuration key 'state.checkpoints.dir' instead of proper key 
'execution.checkpointing.dir'
2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.streaming.runtime.tasks.StreamTask  - Checkpoint storage 
is set to 'filesystem': (checkpoints "hdfs://ns1/checkpoint/dw_flinkinner")
2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.runtime.taskmanager.Task  - Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) 
switched from DEPLOYING to INITIALIZING.
2025-11-12 05:02:48.395 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.runtime.state.StateBackendLoader  - State backend is set 
to heap memory 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@37d6bef9
2025-11-12 05:02:48.395 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
WARN  org.apache.flink.metrics.MetricGroup  - Name collision: Group already 
contains a Metric with the name 'pendingCommittables'. Metric will not be 
reported.[dtstack-prod-07, taskmanager, 
container_e17_1762828451362_2318_01_000002, 
300007101_dw_realtime_inner_dwd_sensor_deal_loginresult_dtl, 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer, 0]
2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.runtime.state.StateBackendLoader  - State backend is set 
to heap memory 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@79eb2a14
2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.producer.ProducerConfig  - 
ProducerConfig values: 
    acks = -1
    auto.include.jmx.reporter = true
    batch.size = 16384
    bootstrap.servers = []
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-547
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = true
    interceptor.classes = []
    key.serializer = class 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.adaptive.partitioning.enable = true
    partitioner.availability.timeout.ms = 0
    partitioner.class = null
    partitioner.ignore.keys = false
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = PLAIN
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = SASL_PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 3600000
    transactional.id = null
    value.serializer = class 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer

2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.producer.KafkaProducer  - [Producer 
clientId=producer-547] Instantiated an idempotent producer.
2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.o.a.k.common.security.authenticator.AbstractLogin  - 
Successfully logged in.
2025-11-12 05:02:48.397 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
version: unknown
2025-11-12 05:02:48.397 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
commitId: unknown
2025-11-12 05:02:48.397 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
startTimeMs: 1762894968397
2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.runtime.state.StateBackendLoader  - State backend is set 
to heap memory 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3f137397
2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.runtime.state.StateBackendLoader  - State backend is set 
to heap memory 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@1d95cb68
2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.flink.streaming.api.operators.AbstractStreamOperator  - Restoring 
state for 10 split(s) to reader.
2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding 
split(s) to reader: [[Partition: event_topic-9, StartingOffset: 2430596361, 
StoppingOffset: -9223372036854775808], [Partition: event_topic-8, 
StartingOffset: 2430382382, StoppingOffset: -9223372036854775808], [Partition: 
event_topic-7, StartingOffset: 2431234238, StoppingOffset: 
-9223372036854775808], [Partition: event_topic-6, StartingOffset: 2435053051, 
StoppingOffset: -9223372036854775808], [Partition: event_topic-5, 
StartingOffset: 2453358876, StoppingOffset: -9223372036854775808], [Partition: 
event_topic-4, StartingOffset: 2409060719, StoppingOffset: 
-9223372036854775808], [Partition: event_topic-3, StartingOffset: 2506740242, 
StoppingOffset: -9223372036854775808], [Partition: event_topic-2, 
StartingOffset: 2446799648, StoppingOffset: -9223372036854775808], [Partition: 
event_topic-1, StartingOffset: 2399380974, StoppingOffset: 
-9223372036854775808], [Partition: event_topic-0, StartingOffset: 2451516068, 
StoppingOffset: -9223372036854775808]]
2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.ConsumerConfig  - 
ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.include.jmx.reporter = true
    auto.offset.reset = latest
    bootstrap.servers = [hybrid01.gjqh-sc.sensorsdata.cloud:9092, 
hybrid02.gjqh-sc.sensorsdata.cloud:9092, 
hybrid03.gjqh-sc.sensorsdata.cloud:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = dwd_sensor_deal_loginresult_dtl-0
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = dwd_sensor_deal_loginresult_dtl
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RangeAssignor, 
class 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 45000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer

2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
WARN  o.a.f.k.s.org.apache.kafka.clients.consumer.ConsumerConfig  - These 
configurations '[client.id.prefix, partition.discovery.interval.ms]' were 
supplied but are not used yet.
2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
version: unknown
2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
commitId: unknown
2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - Kafka 
startTimeMs: 1762894968401
2025-11-12 05:02:48.401 [kafka-producer-network-thread | producer-547] INFO  
o.a.flink.kafka.shaded.org.apache.kafka.clients.Metadata  - [Producer 
clientId=producer-547] Cluster ID: o3_RsYEWTqqofZkOgluOzQ
2025-11-12 05:02:48.402 [kafka-producer-network-thread | producer-547] INFO  
o.a.f.k.s.o.a.k.c.producer.internals.TransactionManager  - [Producer 
clientId=producer-547] ProducerId set to 35323 with epoch 0
2025-11-12 05:02:48.402 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Starting 
split fetcher 0
2025-11-12 05:02:48.402 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  org.apache.flink.runtime.taskmanager.Task  - Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) 
switched from INITIALIZING to RUNNING.
2025-11-12 05:02:48.403 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Assigned to partition(s): 
event_topic-9, event_topic-8, event_topic-7, event_topic-6, event_topic-5, 
event_topic-4, event_topic-3, event_topic-2, event_topic-1, event_topic-0
2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2451516068 for 
partition event_topic-0
2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2399380974 for 
partition event_topic-1
2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2446799648 for 
partition event_topic-2
2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2506740242 for 
partition event_topic-3
2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2430382382 for 
partition event_topic-8
2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2430596361 for 
partition event_topic-9
2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2409060719 for 
partition event_topic-4
2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2453358876 for 
partition event_topic-5
2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2435053051 for 
partition event_topic-6
2025-11-12 05:02:48.404 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2431234238 for 
partition event_topic-7
2025-11-12 05:02:48.407 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.flink.kafka.shaded.org.apache.kafka.clients.Metadata  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Cluster ID: gR5Vztt6TBCRvexaAjzrCw
2025-11-12 05:02:48.616 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.org.apache.kafka.clients.producer.KafkaProducer  - [Producer 
clientId=producer-547] Closing the Kafka producer with timeoutMillis = 3600000 
ms.
2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Metrics 
scheduler closed
2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Closing 
reporter 
org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.JmxReporter
2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Metrics 
reporters closed
2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - App info 
kafka.producer for producer-547 unregistered
2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing 
Source Reader.
2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting 
down split fetcher 0
2025-11-12 05:02:48.617 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.o.a.k.c.consumer.internals.ConsumerCoordinator  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Resetting generation and member id due 
to: consumer pro-actively leaving the group
2025-11-12 05:02:48.617 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.s.o.a.k.c.consumer.internals.ConsumerCoordinator  - [Consumer 
clientId=dwd_sensor_deal_loginresult_dtl-0, 
groupId=dwd_sensor_deal_loginresult_dtl] Request joining group due to: consumer 
pro-actively leaving the group
2025-11-12 05:02:48.617 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Metrics 
scheduler closed
2025-11-12 05:02:48.617 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Closing 
reporter 
org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.JmxReporter
2025-11-12 05:02:48.617 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics  - Metrics 
reporters closed
2025-11-12 05:02:48.618 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser  - App info 
kafka.consumer for dwd_sensor_deal_loginresult_dtl-0 unregistered
2025-11-12 05:02:48.618 [Source Data Fetcher for Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Split 
fetcher 0 exited.
2025-11-12 05:02:48.618 [Source: dw_realtime_inner_ods_sensor_events[1] -> 
Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] 
WARN  org.apache.flink.runtime.taskmanager.Task  - Source: 
dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> 
dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 
(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) 
switched from RUNNING to FAILED with failure cause:
java.io.IOException: Failed to deserialize consumer record due to
    at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
    at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize consumer record 
ConsumerRecord(topic = event_topic, partition = 1, leaderEpoch = 7, offset = 
2399381243, CreateTime = 1762853256453, serialized key size = 37, serialized 
value size = 2917, headers = RecordHeaders(headers = [], isReadOnly = false), 
key = [B@54e6a925, value = [B@16f80c51).
    at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:59)
    at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
    ... 14 common frames omitted
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
    at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
    at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
    at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:115)
    at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
    ... 15 common frames omitted
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at StreamExecCalc$150.processElement_0_0_rewriteGroup82(Unknown Source)
    at StreamExecCalc$150.processElement(Unknown Source)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
    ... 23 common frames omitted
Caused by: java.lang.IllegalStateException: Received element after endOfInput: 
Record @ (undef) : 
+I(1762853228541,3333053382491751361,6994131829464607053,deal_loginresult,true,85427403,,false,fido,101.230.113.157:7001:8940:7003:8941:????????3,HUNDSUN,CTP,b7afd566c360b4104d44d46bc4cc972e,cf1b93e8c889f5acaa5956a01a1edf30,,,18826001457,,,,null,null,null,null,null,null,null,null,null,??????,app,null,null,null,null,null,null,null)
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
    at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:203)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
    ... 28 common frames omitted

回复