Hi,看起来像是sink收到了endInput的信号之后,又有数据喂给他、导致报错。
你用的是什么版本的flink?我不太确定是不是和这个issue有关系https://issues.apache.org/jira/browse/FLINK-37605,你可以用jira里贴的fixed
version试试
--
Best!
Xuyang
在 2025-11-12 09:20:59,"徐平" <[email protected]> 写道:
>大佬们好,有没有人遇到下述问题,感谢回复
>
>任务sql:
>
>CREATE TEMPORARY TABLE `dw_realtime_inner_ods_sensor_events` (
> `_track_id` VARCHAR,
> `time` VARCHAR,
> `type` VARCHAR,
> `distinct_id` VARCHAR,
> `anonymous_id` VARCHAR,
> `identities` VARCHAR,
> `event` VARCHAR,
> `lib` VARCHAR,
> `properties` VARCHAR,
> `_flush_time` VARCHAR,
> `dtk` VARCHAR,
> `map_id` VARCHAR,
> `user_id` VARCHAR,
> `recv_time` VARCHAR,
> `extractor` VARCHAR,
> `project_id` VARCHAR,
> `project` VARCHAR,
> `ts` AS `TO_TIMESTAMP_LTZ`(CAST(`time` AS BIGINT), 3),
> `ver` VARCHAR,
> WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND
>) WITH (
> 'connector' = 'kafka',
> 'properties.bootstrap.servers' = '--',
> 'topic' = 'event_topic',
> 'value.format' = 'json',
> 'properties.group.id' = 'dwd_sensor_deal_loginresult_dtl',
> 'scan.startup.mode' = 'latest-offset'
>);
>CREATE TEMPORARY TABLE `dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000`
>(
>
> `event_time` VARCHAR,
> `sensor_user_id` VARCHAR,
> `lucy_user_id` VARCHAR,
> `event` VARCHAR,
> `is_success` VARCHAR,
> `account_id` VARCHAR,
> `fail_reason` VARCHAR,
> `initpasswd_flag` VARCHAR,
> `login_type` VARCHAR,
> `ip_site` VARCHAR,
> `relay_channel` VARCHAR,
> `counter_channel` VARCHAR,
> `lucy_deviceid` VARCHAR,
> `cid` VARCHAR,
> `org_user_company` VARCHAR,
> `org_user_ID` VARCHAR,
> `phone_number` VARCHAR,
> `org_user_loginID` VARCHAR,
> `org_user_type` VARCHAR,
> `org_user_level` VARCHAR,
> `os` VARCHAR,
> `os_version` VARCHAR,
> `lib` VARCHAR,
> `lib_version` VARCHAR,
> `manufacturer` VARCHAR,
> `model` VARCHAR,
> `brand` VARCHAR,
> `app_version` VARCHAR,
> `app_id` VARCHAR,
> `app_name` VARCHAR,
> `platform_type` VARCHAR,
> `sensor_device_id` VARCHAR,
> `ip` VARCHAR,
> `track_signup_original_id` VARCHAR,
> `is_login_id` VARCHAR,
> `city` VARCHAR,
> `province` VARCHAR,
> `country` VARCHAR
>) WITH (
> 'connector' = 'kafka',
> 'properties.sasl.jaas.config'='****',
> 'properties.sasl.mechanism' = 'PLAIN',
> 'properties.bootstrap.servers' = ‘--',
> 'properties.security.protocol' = 'SASL_PLAINTEXT',
> 'topic' = 'dwd-sensor-deal-loginresult-dtl',
> 'value.format' = 'json'
>);
>INSERT INTO `dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000`
>(SELECT `time` AS `event_time`
> , `user_id` AS `sensor_user_id`
> , `distinct_id` AS `lucy_user_id`
> , `event` AS `event`
> , JSON_VALUE(`properties`, 'is_success') AS `is_success`
> , JSON_VALUE(`properties`, 'deal_account') AS `account_id`
> , JSON_VALUE(`properties`, 'fail_reason') AS `fail_reason`
> , JSON_VALUE(`properties`, 'initpasswd_flag') AS `initpasswd_flag`
> , JSON_VALUE(`properties`, 'login_type') AS `login_type`
> , JSON_VALUE(`properties`, 'IP_site') AS `ip_site`
> , JSON_VALUE(`properties`, 'relay_channel') AS `relay_channel`
> , JSON_VALUE(`properties`, 'counter_channel') AS `counter_channel`
> , JSON_VALUE(`properties`, 'lucy_deviceid') AS `lucy_deviceid`
> , JSON_VALUE(`properties`, 'app_cid') AS `cid`
> , JSON_VALUE(`properties`, 'org_user_company') AS `org_user_company`
> , JSON_VALUE(`properties`, 'org_user_ID') AS `org_user_ID`
> , JSON_VALUE(`properties`, 'phone_number') AS `phone_number`
> , JSON_VALUE(`properties`, 'org_user_loginID') AS `org_user_loginID`
> , JSON_VALUE(`properties`, 'org_user_type') AS `org_user_type`
> , JSON_VALUE(`properties`, 'org_user_level') AS `org_user_level`
> , JSON_VALUE(`properties`, '$os') AS `os`
> , JSON_VALUE(`properties`, '$os_version') AS `os_version`
> , JSON_VALUE(`properties`, '$lib') AS `lib`
> , JSON_VALUE(`properties`, '$lib_version') AS `lib_version`
> , JSON_VALUE(`properties`, '$manufacturer') AS `manufacturer`
> , JSON_VALUE(`properties`, '$model') AS `model`
> , JSON_VALUE(`properties`, '$brand') AS `brand`
> , JSON_VALUE(`properties`, '$app_version') AS `app_version`
> , JSON_VALUE(`properties`, '$app_id') AS `app_id`
> , JSON_VALUE(`properties`, 'app_name') AS `app_name`
> , JSON_VALUE(`properties`, 'platform_type') AS `platform_type`
> , JSON_VALUE(`properties`, '$device_id') AS `sensor_device_id`
> , JSON_VALUE(`properties`, '$ip') AS `ip`
> , JSON_VALUE(`properties`, '$track_signup_original_id') AS
> `track_signup_original_id`
> , JSON_VALUE(`properties`, '$is_login_id') AS `is_login_id`
> , JSON_VALUE(`properties`, '$city') AS `city`
> , JSON_VALUE(`properties`, '$province') AS `province`
> , JSON_VALUE(`properties`, '$country') AS `country`
> FROM `dw_realtime_inner_ods_sensor_events`
> WHERE `event` = 'deal_loginresult');
>
>
>2025-11-12 05:01:59.246 [Flink-Metric-Reporter-thread-1] INFO
>c.alibaba.dataphin.flink.metric.reporter.adapter.HttpSender - send message to
>topic:dataphin-dataphin:DATAPHIN_NIGHTSWATCH_V4
>2025-11-12 05:02:48.387 [flink-pekko.actor.default-dispatcher-13] INFO
>org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot
>e0a66effa7e07c1b33dfcce116235362.
>2025-11-12 05:02:48.389 [flink-pekko.actor.default-dispatcher-13] INFO
>org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546),
>deploy into slot with allocation id e0a66effa7e07c1b33dfcce116235362.
>2025-11-12 05:02:48.389 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.runtime.taskmanager.Task - Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546)
>switched from CREATED to DEPLOYING.
>2025-11-12 05:02:48.389 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task
>Source: dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546)
>[DEPLOYING].
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is
>set to heap memory
>org.apache.flink.runtime.state.hashmap.HashMapStateBackend@70f9dfb9
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.runtime.state.StateBackendLoader - State backend
>loader loads the state backend as HashMapStateBackend
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>WARN org.apache.flink.configuration.Configuration - Config uses deprecated
>configuration key 'state.checkpoint-storage' instead of proper key
>'execution.checkpointing.storage'
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>WARN org.apache.flink.configuration.Configuration - Config uses deprecated
>configuration key 'state.checkpoints.dir' instead of proper key
>'execution.checkpointing.dir'
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Checkpoint
>storage is set to 'filesystem': (checkpoints
>"hdfs://ns1/checkpoint/dw_flinkinner")
>2025-11-12 05:02:48.390 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.runtime.taskmanager.Task - Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546)
>switched from DEPLOYING to INITIALIZING.
>2025-11-12 05:02:48.395 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.runtime.state.StateBackendLoader - State backend is
>set to heap memory
>org.apache.flink.runtime.state.hashmap.HashMapStateBackend@37d6bef9
>2025-11-12 05:02:48.395 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already
>contains a Metric with the name 'pendingCommittables'. Metric will not be
>reported.[dtstack-prod-07, taskmanager,
>container_e17_1762828451362_2318_01_000002,
>300007101_dw_realtime_inner_dwd_sensor_deal_loginresult_dtl,
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer, 0]
>2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.runtime.state.StateBackendLoader - State backend is
>set to heap memory
>org.apache.flink.runtime.state.hashmap.HashMapStateBackend@79eb2a14
>2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.producer.ProducerConfig -
>ProducerConfig values:
> acks = -1
> auto.include.jmx.reporter = true
> batch.size = 16384
> bootstrap.servers = []
> buffer.memory = 33554432
> client.dns.lookup = use_all_dns_ips
> client.id = producer-547
> compression.type = none
> connections.max.idle.ms = 540000
> delivery.timeout.ms = 120000
> enable.idempotence = true
> interceptor.classes = []
> key.serializer = class
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer
> linger.ms = 0
> max.block.ms = 60000
> max.in.flight.requests.per.connection = 5
> max.request.size = 1048576
> metadata.max.age.ms = 300000
> metadata.max.idle.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partitioner.adaptive.partitioning.enable = true
> partitioner.availability.timeout.ms = 0
> partitioner.class = null
> partitioner.ignore.keys = false
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 30000
> retries = 2147483647
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = [hidden]
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.connect.timeout.ms = null
> sasl.login.read.timeout.ms = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.login.retry.backoff.max.ms = 10000
> sasl.login.retry.backoff.ms = 100
> sasl.mechanism = PLAIN
> sasl.oauthbearer.clock.skew.seconds = 30
> sasl.oauthbearer.expected.audience = null
> sasl.oauthbearer.expected.issuer = null
> sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
> sasl.oauthbearer.jwks.endpoint.url = null
> sasl.oauthbearer.scope.claim.name = scope
> sasl.oauthbearer.sub.claim.name = sub
> sasl.oauthbearer.token.endpoint.url = null
> security.protocol = SASL_PLAINTEXT
> security.providers = null
> send.buffer.bytes = 131072
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.2
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> transaction.timeout.ms = 3600000
> transactional.id = null
> value.serializer = class
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer
>
>2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.producer.KafkaProducer - [Producer
>clientId=producer-547] Instantiated an idempotent producer.
>2025-11-12 05:02:48.396 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.o.a.k.common.security.authenticator.AbstractLogin -
>Successfully logged in.
>2025-11-12 05:02:48.397 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka
>version: unknown
>2025-11-12 05:02:48.397 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka
>commitId: unknown
>2025-11-12 05:02:48.397 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka
>startTimeMs: 1762894968397
>2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.runtime.state.StateBackendLoader - State backend is
>set to heap memory
>org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3f137397
>2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.runtime.state.StateBackendLoader - State backend is
>set to heap memory
>org.apache.flink.runtime.state.hashmap.HashMapStateBackend@1d95cb68
>2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.flink.streaming.api.operators.AbstractStreamOperator - Restoring
>state for 10 split(s) to reader.
>2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding
>split(s) to reader: [[Partition: event_topic-9, StartingOffset: 2430596361,
>StoppingOffset: -9223372036854775808], [Partition: event_topic-8,
>StartingOffset: 2430382382, StoppingOffset: -9223372036854775808], [Partition:
>event_topic-7, StartingOffset: 2431234238, StoppingOffset:
>-9223372036854775808], [Partition: event_topic-6, StartingOffset: 2435053051,
>StoppingOffset: -9223372036854775808], [Partition: event_topic-5,
>StartingOffset: 2453358876, StoppingOffset: -9223372036854775808], [Partition:
>event_topic-4, StartingOffset: 2409060719, StoppingOffset:
>-9223372036854775808], [Partition: event_topic-3, StartingOffset: 2506740242,
>StoppingOffset: -9223372036854775808], [Partition: event_topic-2,
>StartingOffset: 2446799648, StoppingOffset: -9223372036854775808], [Partition:
>event_topic-1, StartingOffset: 2399380974, StoppingOffset:
>-9223372036854775808], [Partition: event_topic-0, StartingOffset: 2451516068,
>StoppingOffset: -9223372036854775808]]
>2025-11-12 05:02:48.400 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.ConsumerConfig -
>ConsumerConfig values:
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.include.jmx.reporter = true
> auto.offset.reset = latest
> bootstrap.servers = [hybrid01.gjqh-sc.sensorsdata.cloud:9092,
> hybrid02.gjqh-sc.sensorsdata.cloud:9092,
> hybrid03.gjqh-sc.sensorsdata.cloud:9092]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = dwd_sensor_deal_loginresult_dtl-0
> client.rack =
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = dwd_sensor_deal_loginresult_dtl
> group.instance.id = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 500
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RangeAssignor,
> class
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 30000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.connect.timeout.ms = null
> sasl.login.read.timeout.ms = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.login.retry.backoff.max.ms = 10000
> sasl.login.retry.backoff.ms = 100
> sasl.mechanism = GSSAPI
> sasl.oauthbearer.clock.skew.seconds = 30
> sasl.oauthbearer.expected.audience = null
> sasl.oauthbearer.expected.issuer = null
> sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
> sasl.oauthbearer.jwks.endpoint.url = null
> sasl.oauthbearer.scope.claim.name = scope
> sasl.oauthbearer.sub.claim.name = sub
> sasl.oauthbearer.token.endpoint.url = null
> security.protocol = PLAINTEXT
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 45000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.2
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>
>2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>WARN o.a.f.k.s.org.apache.kafka.clients.consumer.ConsumerConfig - These
>configurations '[client.id.prefix, partition.discovery.interval.ms]' were
>supplied but are not used yet.
>2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka
>version: unknown
>2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka
>commitId: unknown
>2025-11-12 05:02:48.401 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka
>startTimeMs: 1762894968401
>2025-11-12 05:02:48.401 [kafka-producer-network-thread | producer-547] INFO
>o.a.flink.kafka.shaded.org.apache.kafka.clients.Metadata - [Producer
>clientId=producer-547] Cluster ID: o3_RsYEWTqqofZkOgluOzQ
>2025-11-12 05:02:48.402 [kafka-producer-network-thread | producer-547] INFO
>o.a.f.k.s.o.a.k.c.producer.internals.TransactionManager - [Producer
>clientId=producer-547] ProducerId set to 35323 with epoch 0
>2025-11-12 05:02:48.402 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Starting
>split fetcher 0
>2025-11-12 05:02:48.402 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO org.apache.flink.runtime.taskmanager.Task - Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546)
>switched from INITIALIZING to RUNNING.
>2025-11-12 05:02:48.403 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Assigned to partition(s):
>event_topic-9, event_topic-8, event_topic-7, event_topic-6, event_topic-5,
>event_topic-4, event_topic-3, event_topic-2, event_topic-1, event_topic-0
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2451516068 for
>partition event_topic-0
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2399380974 for
>partition event_topic-1
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2446799648 for
>partition event_topic-2
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2506740242 for
>partition event_topic-3
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2430382382 for
>partition event_topic-8
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2430596361 for
>partition event_topic-9
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2409060719 for
>partition event_topic-4
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2453358876 for
>partition event_topic-5
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2435053051 for
>partition event_topic-6
>2025-11-12 05:02:48.404 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Seeking to offset 2431234238 for
>partition event_topic-7
>2025-11-12 05:02:48.407 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.flink.kafka.shaded.org.apache.kafka.clients.Metadata - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Cluster ID: gR5Vztt6TBCRvexaAjzrCw
>2025-11-12 05:02:48.616 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.org.apache.kafka.clients.producer.KafkaProducer - [Producer
>clientId=producer-547] Closing the Kafka producer with timeoutMillis = 3600000
>ms.
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics - Metrics
>scheduler closed
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics - Closing
>reporter
>org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.JmxReporter
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics - Metrics
>reporters closed
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser - App info
>kafka.producer for producer-547 unregistered
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing
>Source Reader.
>2025-11-12 05:02:48.617 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting
>down split fetcher 0
>2025-11-12 05:02:48.617 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.o.a.k.c.consumer.internals.ConsumerCoordinator - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Resetting generation and member id
>due to: consumer pro-actively leaving the group
>2025-11-12 05:02:48.617 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.s.o.a.k.c.consumer.internals.ConsumerCoordinator - [Consumer
>clientId=dwd_sensor_deal_loginresult_dtl-0,
>groupId=dwd_sensor_deal_loginresult_dtl] Request joining group due to:
>consumer pro-actively leaving the group
>2025-11-12 05:02:48.617 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics - Metrics
>scheduler closed
>2025-11-12 05:02:48.617 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics - Closing
>reporter
>org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.JmxReporter
>2025-11-12 05:02:48.617 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.kafka.shaded.org.apache.kafka.common.metrics.Metrics - Metrics
>reporters closed
>2025-11-12 05:02:48.618 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.f.k.shaded.org.apache.kafka.common.utils.AppInfoParser - App info
>kafka.consumer for dwd_sensor_deal_loginresult_dtl-0 unregistered
>2025-11-12 05:02:48.618 [Source Data Fetcher for Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split
>fetcher 0 exited.
>2025-11-12 05:02:48.618 [Source: dw_realtime_inner_ods_sensor_events[1] ->
>Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546]
>WARN org.apache.flink.runtime.taskmanager.Task - Source:
>dw_realtime_inner_ods_sensor_events[1] -> Calc[2] ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer ->
>dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546
>(765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546)
>switched from RUNNING to FAILED with failure cause:
>java.io.IOException: Failed to deserialize consumer record due to
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:748)
>Caused by: java.io.IOException: Failed to deserialize consumer record
>ConsumerRecord(topic = event_topic, partition = 1, leaderEpoch = 7, offset =
>2399381243, CreateTime = 1762853256453, serialized key size = 37, serialized
>value size = 2917, headers = RecordHeaders(headers = [], isReadOnly = false),
>key = [B@54e6a925, value = [B@16f80c51).
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:59)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
> ... 14 common frames omitted
>Caused by:
>org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
> at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
> at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:115)
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ... 15 common frames omitted
>Caused by:
>org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at StreamExecCalc$150.processElement_0_0_rewriteGroup82(Unknown Source)
> at StreamExecCalc$150.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 23 common frames omitted
>Caused by: java.lang.IllegalStateException: Received element after endOfInput:
>Record @ (undef) :
>+I(1762853228541,3333053382491751361,6994131829464607053,deal_loginresult,true,85427403,,false,fido,101.230.113.157:7001:8940:7003:8941:????????3,HUNDSUN,CTP,b7afd566c360b4104d44d46bc4cc972e,cf1b93e8c889f5acaa5956a01a1edf30,,,18826001457,,,,null,null,null,null,null,null,null,null,null,??????,app,null,null,null,null,null,null,null)
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 28 common frames omitted
>