[
https://issues.apache.org/jira/browse/BAHIR-303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17532644#comment-17532644
]
chen commented on BAHIR-303:
----------------------------
Hi Boto.
The field record_time is not null , because I debug at the code
_*partialRow.addObject(this.columnNames[i], field);*_ when the field is null
, and then rerun the code *_field = this.getField(input, i);_* the field
change not null , that's weird .
> AbstractSingleOperationMapper.createOperations method error with
> IllegalArgumentException
> -----------------------------------------------------------------------------------------
>
> Key: BAHIR-303
> URL: https://issues.apache.org/jira/browse/BAHIR-303
> Project: Bahir
> Issue Type: Bug
> Components: Flink Streaming Connectors
> Affects Versions: Flink-1.0
> Environment: centos7
> flink1.13
> Reporter: chen
> Priority: Critical
> Fix For: Not Applicable
>
> Attachments: error.log
>
>
> *when I use flink write data (from kafka and hive) to kudu , sometime (not
> every row) happens some error like :*
> java.lang.IllegalArgumentException: record_time cannot be set to null
> at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:986)
> at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:968)
> at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1077)
> at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1042)
> at
> org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper.createOperations(AbstractSingleOperationMapper.java:103)
> at
> org.apache.flink.connectors.kudu.connector.writer.KuduWriter.write(KuduWriter.java:97)
>
> *Add code to AbstractSingleOperationMapper class can fix it , I don't know
> why ? Please see this promble , thx .*
> The code like :
> @Override
> public List<Operation> createOperations(T input, KuduTable table) {
> Optional<Operation> operationOpt = this.createBaseOperation(input,
> table);
> if (!operationOpt.isPresent())
> { return Collections.emptyList(); }
> else {
> Operation operation = (Operation)operationOpt.get();
> PartialRow partialRow = operation.getRow();
> for(int i = 0; i < this.columnNames.length; ++i) {
> Object field = this.getField(input, i);
> if (field instanceof LazyBinaryFormat)
> { field = ((LazyBinaryFormat)field).getJavaObject();
> }
> if (field instanceof TimestampData)
> { field = ((TimestampData)field).toTimestamp();
> }
> {color:#ff0000} *if (field == null &&
> !partialRow.getSchema().getColumn(this.columnNames[i]).isNullable()){*{color}
> {color:#ff8b00} +_*String tmp = input.toString(); //
> this code let field not null*_+ {color}
>
> {color:#ff0000} *field = this.getField(input, i);*{color}
> {color:#ff0000} *if (field instanceof LazyBinaryFormat)
> {*{color}
> {color:#ff0000} *field =
> ((LazyBinaryFormat)field).getJavaObject();*{color}
> {color:#ff0000} *}*{color}
> {color:#ff0000} *if (field instanceof TimestampData)
> {*{color}
> {color:#ff0000} *field =
> ((TimestampData)field).toTimestamp();*{color}
> {color:#ff0000} *}*{color}
> {color:#ff0000} *}*{color}
> partialRow.addObject(this.columnNames[i], field);
> }
> return Collections.singletonList(operation);
> }
> }
> _*Attachment include the full error msg .*_
--
This message was sent by Atlassian Jira
(v8.20.7#820007)