[
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sean.miao updated FLINK-10119:
------------------------------
Description:
Recently, we are using Kafka010JsonTableSource to process kafka's json
messages.We turned on checkpoint and auto-restart strategy .
We found that as long as the format of a message is not json, it will cause the
job to not be pulled up. Of course, this is to ensure that only once processing
or at least once processing, but the resulting application is not available and
has a greater impact on us.
the code is :
class : JsonRowDeserializationSchema
function :
@Override
public Row deserialize(byte[] message) throws IOException {
try
{ final JsonNode root = objectMapper.readTree(message); return convertRow(root,
(RowTypeInfo) typeInfo); }
catch (Throwable t)
{ throw new IOException("Failed to deserialize JSON object.", t); }
}
now ,i change it to :
public Row deserialize(byte[] message) throws IOException {
try
{ JsonNode root = this.objectMapper.readTree(message); return
this.convertRow(root, (RowTypeInfo)this.typeInfo); }
catch (Throwable var4) {
message = this.objectMapper.writeValueAsBytes("{}");
JsonNode root = this.objectMapper.readTree(message);
return this.convertRow(root, (RowTypeInfo)this.typeInfo);
}
}
I think that data format errors are inevitable during network transmission, so
can we add a new column to the table for the wrong data format? like spark sql
does。
was:
enable checkpoint and use RestartStrategies.fixedDelayRestart 。
Recently, we are using Kafka010JsonTableSource to process kafka's json
messages.We turned on checkpoint and auto-restart strategy .
We found that as long as the format of a message is not json, it will cause the
job to not be pulled up. Of course, this is to ensure that only one processing
or at least one processing, but the resulting application is not available and
has a greater impact on us.
the code is :
class : JsonRowDeserializationSchema
function :
@Override
public Row deserialize(byte[] message) throws IOException {
try
{ final JsonNode root = objectMapper.readTree(message); return convertRow(root,
(RowTypeInfo) typeInfo); }
catch (Throwable t)
{ throw new IOException("Failed to deserialize JSON object.", t); }
}
now ,i change it to :
public Row deserialize(byte[] message) throws IOException {
try {
JsonNode root = this.objectMapper.readTree(message);
return this.convertRow(root, (RowTypeInfo)this.typeInfo);
} catch (Throwable var4) {
message = this.objectMapper.writeValueAsBytes("{}");
JsonNode root = this.objectMapper.readTree(message);
return this.convertRow(root, (RowTypeInfo)this.typeInfo);
}
}
I think that data format errors are inevitable during network transmission, so
can we add a new column to the table for the wrong data format? like spark sql
does。
> JsonRowDeserializationSchema deserialize kafka message
> ------------------------------------------------------
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.5.1
> Environment: 无
> Reporter: sean.miao
> Priority: Major
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause
> the job to not be pulled up. Of course, this is to ensure that only once
> processing or at least once processing, but the resulting application is not
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
> public Row deserialize(byte[] message) throws IOException {
> try
> { final JsonNode root = objectMapper.readTree(message); return
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to :
> public Row deserialize(byte[] message) throws IOException {
> try
> { JsonNode root = this.objectMapper.readTree(message); return
> this.convertRow(root, (RowTypeInfo)this.typeInfo); }
> catch (Throwable var4) {
> message = this.objectMapper.writeValueAsBytes("{}");
> JsonNode root = this.objectMapper.readTree(message);
> return this.convertRow(root, (RowTypeInfo)this.typeInfo);
> }
> }
>
> I think that data format errors are inevitable during network transmission,
> so can we add a new column to the table for the wrong data format? like spark
> sql does。
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)