[ 
https://issues.apache.org/jira/browse/FLINK-24885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangbaohua updated FLINK-24885:
-------------------------------
    Attachment: error.jpg

> ProcessElement Interface parameter Collector  : java.lang.NullPointerException
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-24885
>                 URL: https://issues.apache.org/jira/browse/FLINK-24885
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.13.1
>            Reporter: wangbaohua
>            Priority: Blocker
>         Attachments: error.jpg
>
>
> 2021-11-15 11:11:55,032 INFO  com.asap.demo.function.dealMapFunction          
>              [] - size:160
> 2021-11-15 11:11:55,230 WARN  org.apache.flink.runtime.taskmanager.Task       
>              [] - Co-Process-Broadcast-Keyed -> Map -> 
> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_8,
>  type=*com.asap.demo.model.BeanField<`account` STRING, `accountId` STRING, 
> `accountIn` STRING, `accountInName` STRING, `accountInOrgId` STRING, 
> `accountInOrgName` STRING, `accountInType` STRING, `accountName` STRING, 
> `accountOrgId` STRING, `accountOrgName` STRING, `accountOut` STRING, 
> `accountOutName` STRING, `accountOutOrgId` STRING, `accountOutOrgName` 
> STRING, `accountOutType` STRING, `accountStatus` STRING, `accountType` 
> STRING, `action` STRING, `actionDesc` STRING, `alarmcontext` STRING, 
> `alarmgrade` STRING, `alarmtype` STRING, `alertId` STRING, `alertInfo` 
> STRING, `alertLevel` STRING, `alertSignatureIdL` STRING, `appId` STRING, 
> `appName` STRING, `appProtocol` STRING, `appType` STRING, `areaId` STRING, 
> `areaName` STRING, `areaType` STRING, `assetFrom` STRING, `assetId` STRING, 
> `assetInfo` STRING, `assetIp` STRING, `assetLevel` STRING, `assetName` 
> STRING, `assetPid` STRING, `assetType` STRING, `assetUse` STRING, 
> `assetVendor` STRING, `attackStage` STRING, `attackStageCode` STRING, 
> `attackType` STRING, `attackTypeName` STRING, `authSerNum` STRING, `authTime` 
> STRING, `authType` STRING, `bankSeqNum` STRING, `batchNo` STRING, 
> `blackDomain` STRING, `blackDomainDesc` STRING, `blackDomainTag` STRING, 
> `blackDstIp` STRING, `blackFile` STRING, `blackFileDesc` STRING, 
> `blackFileTag` STRING, `blackId` STRING, `blackIpTag` STRING, `blackSrcIp` 
> STRING, `blackTag` STRING, `blackTagMatchCount` STRING, `blackTagMatchDesc` 
> STRING, `blackUrl` STRING, `blackUrlDesc` STRING, `blackUrlTag` STRING, 
> `blackVulnCve` STRING, `blackVulnDesc` STRING, `blackVulnName` STRING, 
> `blackVulnTag` STRING, `branchId` STRING, `branchName` STRING, 
> `businessSystemName` STRING, `businessType` STRING, `cardId` STRING, 
> `cascadeSourceIp` STRING, `cascadeSourceName` STRING, `cebUid` STRING, 
> `certNum` STRING, `certType` STRING, `chainId` STRING, `channel` STRING, 
> `channelId` STRING, `character` STRING, `charge` STRING, `cifSeqNum` STRING, 
> `clientInfo` STRING, `clientIp` STRING, `clientMac` STRING, `clientName` 
> STRING, `clientPort` STRING, `collectTime` TIMESTAMP_LTZ(9), `collectTimeL` 
> TIMESTAMP_LTZ(9), `command` STRING, `commandLine` STRING, `commandResult` 
> STRING, `company` STRING, `companyCustomId` STRING, `companyId` STRING, 
> `completenessTag` STRING, `confidence` STRING, `confidenceLevel` STRING, 
> `consignedUser` STRING, `contractNo` STRING, `count` STRING, `couponAmount` 
> STRING, `couponId` STRING, `createTime` TIMESTAMP_LTZ(3), `createTimeL` 
> BIGINT, `createdBy` STRING, `curType` STRING, `currency` STRING, `currentBal` 
> STRING, `customLabel1` STRING, `customLabel10` STRING, `customLabel2` STRING, 
> `customLabel3` STRING, `customLabel4` STRING, `customLabel5` STRING, 
> `customLabel6` STRING, `customLabel7` STRING, `customLabel8` STRING, 
> `customLabel9` STRING, `customValue1` STRING, `customValue10` STRING, 
> `customValue2` STRING, `customValue3` STRING, `customValue4` STRING, 
> `customValue5` STRING, `customValue6` STRING, `customValue7` STRING, 
> `customValue8` STRING, `customValue9` STRING, `dataQualityTag` STRING, 
> `dataType` STRING, `dataTypeName` STRING, `dbInstance` STRING, `dbName` 
> STRING, `dbTable` STRING, `dbVersion` STRING, `dealSuggest` STRING, 
> `defVManagerId` STRING, `department` STRING, `deviceCategory` STRING, 
> `deviceId` STRING, `deviceIp` STRING, `deviceMac` STRING, `deviceName` 
> STRING, `deviceParentType` STRING, `deviceType` STRING, `deviceVersion` 
> STRING, `direction` STRING, `directionDesc` STRING, `directionOfAttackTag` 
> STRING, `domain` STRING, `dstAdminAccount` STRING, `dstAdminEmail` STRING, 
> `dstAdminFOrgId` STRING, `dstAdminId` STRING, `dstAdminMobile` STRING, 
> `dstAdminName` STRING, `dstAdminOrgId` STRING, `dstAdminOrgName` STRING, 
> `dstAdminType` STRING, `dstAsset` STRING, `dstAssetId` STRING, `dstAssetInfo` 
> STRING, `dstAssetKey` STRING, `dstAssetLevel` STRING, `dstAssetModel` STRING, 
> `dstAssetName` STRING, `dstAssetPid` STRING, `dstAssetStatus` STRING, 
> `dstAssetSubType` STRING, `dstAssetType` STRING, `dstAssetVendor` STRING, 
> `dstBizId` STRING, `dstCity` STRING, `dstCompany` STRING, `dstCountry` 
> STRING, `dstDbInstance` STRING, `dstDomainName` STRING, `dstFGroupId` STRING, 
> `dstGroupId` STRING, `dstGroupName` STRING, `dstHostName` STRING, 
> `dstIndustry` STRING, `dstIntelDesc` STRING, `dstIntelId` STRING, 
> `dstIntelType` STRING, `dstInterface` STRING, `dstIp` STRING, `dstIpL` 
> STRING, `dstLatitude` STRING, `dstLongitude` STRING, `dstMac` STRING, 
> `dstManagerIp` STRING, `dstNatIp` STRING, `dstNatPort` STRING, `dstOperator` 
> STRING, `dstOrgAdmin` STRING, `dstOrgId` STRING, `dstOrgName` STRING, 
> `dstOsId` STRING, `dstPort` STRING, `dstPost` STRING, `dstProvince` STRING, 
> `dstService` STRING, `dstSubDomainName` STRING, `dstUser` STRING, `dstZone` 
> STRING, `duration` STRING, `empNum` STRING, `endTime` TIMESTAMP_LTZ(9), 
> `engineName` STRING, `entryTime` TIMESTAMP_LTZ(9), `errorCode` STRING, 
> `eventAppendix` STRING, `eventCount` STRING, `eventId` STRING, `eventIp` 
> STRING, `eventName` STRING, `eventOneType` STRING, `eventOneTypeDesc` STRING, 
> `eventOneTypeName` STRING, `eventParentType` STRING, `eventThreeType` STRING, 
> `eventThreeTypeDesc` STRING, `eventThreeTypeName` STRING, `eventTwoType` 
> STRING, `eventTwoTypeDesc` STRING, `eventTwoTypeName` STRING, `eventType` 
> STRING, `fileHash` STRING, `fileName` STRING, `filePath` STRING, `fileSize` 
> STRING, `fileType` STRING, `flag` STRING, `flow` STRING, `flowAvg` STRING, 
> `flowDiscard` STRING, `flowDown` STRING, `flowMax` STRING, `flowNum` STRING, 
> `flowUp` STRING, `groupId` STRING, `groupName` STRING, `id` STRING, `idCard` 
> STRING, `indexTag` STRING, `infectionDstIp` STRING, `infectionDstName` 
> STRING, `infectionFile` STRING, `infectionIp` STRING, `infectionSrcIp` 
> STRING, `infectionSrcName` STRING, `installNum` STRING, `instance` STRING, 
> `interestedIp` STRING, `intranetInternetTag` STRING, `ipType` STRING, 
> `isBack` STRING, `jobTitle` STRING, `languageSign` STRING, `lastLoginTime` 
> TIMESTAMP_LTZ(9), `lastUpdBy` STRING, `lastUpdTime` TIMESTAMP_LTZ(9), 
> `latnId` STRING, `length` STRING, `location` STRING, `lockDesc` STRING, 
> `lockTime` TIMESTAMP_LTZ(9), `logStatus` STRING, `logSubType` STRING, 
> `logType` STRING, `loginTime` TIMESTAMP_LTZ(9), `loginType` STRING, 
> `loginWay` STRING, `mailAdd` STRING, `mailIn` STRING, `mailOut` STRING, 
> `mailRecipient` STRING, `mailSender` STRING, `mailSubject` STRING, 
> `mailTotal` STRING, `mailType` STRING, `mainAccount` STRING, 
> `mainAccountCreateTime` TIMESTAMP_LTZ(9), `mainAccountCreateUser` STRING, 
> `mainAccountDesc` STRING, `mainAccountId` STRING, `mainAccountInvalidTime` 
> TIMESTAMP_LTZ(9), `mainAccountLoginDateLast` STRING, 
> `mainAccountLoginFailCount` STRING, `mainAccountModifyPwdTime` 
> TIMESTAMP_LTZ(9), `mainAccountModifyTime` TIMESTAMP_LTZ(9), 
> `mainAccountStatus` STRING, `mainAccountType` STRING, `mainAccountValidTime` 
> TIMESTAMP_LTZ(9), `malwareName` STRING, `malwareSubType` STRING, 
> `malwareType` STRING, `managerId` STRING, `managerIp` STRING, `managerTypeId` 
> STRING, `menuDesc` STRING, `menuId` STRING, `menuName` STRING, `menuPid` 
> STRING, `menuStatus` STRING, `menuType` STRING, `merchantId` STRING, 
> `merchantName` STRING, `message` STRING, `method` STRING, `missingField` 
> STRING, `model` STRING, `module` STRING, `moduleId` STRING, `name` STRING, 
> `networkType` STRING, `newValue` STRING, `nextFlowNum` STRING, `object` 
> STRING, `oldIdCard` STRING, `oldValue` STRING, `openid` STRING, `operType` 
> STRING, `operTypeName` STRING, `ordSerNum` STRING, `order` STRING, `orderNo` 
> STRING, `orderType` STRING, `orgCode` STRING, `orgId` STRING, `orgName` 
> STRING, `orgNameLevel` STRING, `orgNamePath` STRING, `osId` STRING, `osName` 
> STRING, `osVersion` STRING, `parentGroupId` STRING, `parentOrgId` STRING, 
> `parentOrgName` STRING, `parentOrgNamePath` STRING, `passUpdateTime` STRING, 
> `password` STRING, `payItemId` STRING, `payItemName` STRING, `payTime` 
> TIMESTAMP_LTZ(9), `payUnitName` STRING, `payUnitType` STRING, `personId` 
> STRING, `personName` STRING, `phone` STRING, `phoneImer` STRING, `phs` 
> STRING, `policyId` STRING, `policyInfo` STRING, `policyName` STRING, 
> `position` STRING, `priority` STRING, `profession` STRING, `professionName` 
> STRING, `protocol` STRING, `provinceFromId` STRING, `provinceFromName` 
> STRING, `rate` STRING, `rawMsg` STRING, `realFee` STRING, `reason` STRING, 
> `receFee` STRING, `recvPacket` STRING, `recvSize` STRING, `refundOrderNo` 
> STRING, `refundOrderTime` TIMESTAMP_LTZ(9), `registerNum` STRING, 
> `registerRate` STRING, `rejCode` STRING, `relateAccount` STRING, 
> `relateAccountId` STRING, `relateAccountName` STRING, `remark` STRING, 
> `requestMessage` STRING, `requestNo` STRING, `requestTime` TIMESTAMP_LTZ(9), 
> `responseCode` STRING, `responseIp` STRING, `responseMessage` STRING, 
> `result` STRING, `resultCode` STRING, `resultDesc` STRING, `retain` STRING, 
> `riskLevel` STRING, `riskLevelDesc` STRING, `roleId` STRING, `roleName` 
> STRING, `ruleId` STRING, `ruleName` STRING, `ruleTjCount` STRING, 
> `safetyMargin` STRING, `sceneId` STRING, `sceneOneType` STRING, 
> `sceneThreeType` STRING, `sceneTwoType` STRING, `sendPacket` STRING, 
> `sendSize` STRING, `serialNum` STRING, `serverIp` STRING, `serverName` 
> STRING, `serverPort` STRING, `service` STRING, `serviceTime` 
> TIMESTAMP_LTZ(9), `sessionCount` BIGINT, `sessionId` STRING, `settleMethod` 
> STRING, `sex` STRING, `shareFlag` STRING, `sid` STRING, `signData` STRING, 
> `snowId` STRING, `softwareInfo` STRING, `source` STRING, `srcAdminAccount` 
> STRING, `srcAdminEmail` STRING, `srcAdminFOrgId` STRING, `srcAdminId` STRING, 
> `srcAdminMobile` STRING, `srcAdminName` STRING, `srcAdminOrgId` STRING, 
> `srcAdminOrgName` STRING, `srcAdminType` STRING, `srcAsset` STRING, 
> `srcAssetId` STRING, `srcAssetInfo` STRING, `srcAssetKey` STRING, 
> `srcAssetLevel` STRING, `srcAssetModel` STRING, `srcAssetName` STRING, 
> `srcAssetPid` STRING, `srcAssetStatus` STRING, `srcAssetSubType` STRING, 
> `srcAssetType` STRING, `srcAssetVendor` STRING, `srcBizId` STRING, `srcCity` 
> STRING, `srcCompany` STRING, `srcContnent` STRING, `srcCountry` STRING, 
> `srcDbInstance` STRING, `srcDomainName` STRING, `srcFGroupId` STRING, 
> `srcGroupId` STRING, `srcGroupName` STRING, `srcHostName` STRING, 
> `srcIndustry` STRING, `srcIntelDesc` STRING, `srcIntelId` STRING, 
> `srcIntelType` STRING, `srcInterface` STRING, `srcIp` STRING, `srcIpL` 
> STRING, `srcLatitude` STRING, `srcLongitude` STRING, `srcMac` STRING, 
> `srcManagerIp` STRING, `srcNatIp` STRING, `srcNatPort` STRING, `srcOperator` 
> STRING, `srcOrgAdmin` STRING, `srcOrgId` STRING, `srcOrgName` STRING, 
> `srcOsId` STRING, `srcPort` STRING, `srcPost` STRING, `srcProvince` STRING, 
> `srcService` STRING, `srcSubDomainName` STRING, `srcUser` STRING, `srcZone` 
> STRING, `staffCode` STRING, `staffCrm` STRING, `staffName` STRING, 
> `staffState` STRING, `startTime` TIMESTAMP_LTZ(9), `status` STRING, 
> `subAccount` STRING, `subAccountCreateTime` TIMESTAMP_LTZ(9), 
> `subAccountCreateUser` STRING, `subAccountDesc` STRING, `subAccountId` 
> STRING, `subAccountInvalidTime` TIMESTAMP_LTZ(9), `subAccountLoginDateLast` 
> STRING, `subAccountLoginFailCount` STRING, `subAccountModifyPwdTime` 
> TIMESTAMP_LTZ(9), `subAccountModifyTime` TIMESTAMP_LTZ(9), `subAccountStatus` 
> STRING, `subAccountType` STRING, `subAccountValidTime` TIMESTAMP_LTZ(9), 
> `sumAreaId` STRING, `sumManagerId` STRING, `tag` STRING, `taskId` STRING, 
> `taskName` STRING, `telephone` STRING, `telephoneType` STRING, `tenantId` 
> STRING, `tenantName` STRING, `terminalNum` STRING, `threatName` STRING, 
> `threatType` STRING, `threatTypeDesc` STRING, `transBal` STRING, 
> `transChannel` STRING, `transCode` STRING, `transId` STRING, `transName` 
> STRING, `transStatus` STRING, `transTime` TIMESTAMP_LTZ(9), `transType` 
> STRING, `type` STRING, `unitName` STRING, `updateTime` TIMESTAMP_LTZ(9), 
> `upmpQn` STRING, `upmpSerialNum` STRING, `url` STRING, `user` STRING, 
> `userGroupId` STRING, `userGroupName` STRING, `userId` STRING, `userOrgId` 
> STRING, `userOrgName` STRING, `userType` STRING, `uuId` STRING, `value` 
> STRING, `version` STRING, `voidOrderNo` STRING, `vulnId` STRING, `vulnInfo` 
> STRING, `vulnLevel` STRING, `vulnName` STRING, `vulnType` STRING, `weixinId` 
> STRING, `weixinVersion` STRING, `wpTag` STRING, `writeOffTime` 
> TIMESTAMP_LTZ(9)>*, rowtime=false, watermark=true) -> 
> Calc(select=[eventTwoType, deviceParentType, type, eventName, directionDesc, 
> srcIp, dstIp, createTime, snowId]) -> SinkConversionToTuple2 -> Sink: Print 
> to Std. Out (1/1)#0 (851b2092ae4f274d5c7be1f2ea7acaba) switched from RUNNING 
> to FAILED with failure cause: java.lang.NullPointerException
>       at SinkConversion$22.processElement(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>       at StreamExecCalc$18.processElement(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>       at 
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>       at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>       at 
> com.asap.demo.function.dealStreamProcessFunction.match(dealStreamProcessFunction.java:131)
>       at 
> com.asap.demo.function.dealStreamProcessFunction.processElement(dealStreamProcessFunction.java:115)
>       at 
> com.asap.demo.function.dealStreamProcessFunction.processElement(dealStreamProcessFunction.java:33)
>       at 
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:125)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:213)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:178)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>       at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to