Hi,

I'm afraid I don't see another solution than touching the Flink code for
this and adding a try catch block around the timestamp conversion.

It would be great if you could create a Jira issue reporting this problem.
IMO, we should have a configuration switch (either per Table or query) to
either
* fail on dirty data
* "log and drop" dirty rows
* (send dirty data to a side output)

Best, Fabian

Am Mi., 15. Mai 2019 um 16:50 Uhr schrieb maidangdang <maidangdan...@126.com
>:

> I use FlinkSQL to process Kafka data in the following format:
> |  id |  server_time |
> |  1  | 2019-05-15 10:00:00 |
> |  2  | 2019-05-15 10:00:00 |
> .......
>
> and I define rowtime from the  server_time field:
> new Schema()
>     .field("rowtime", Types.SQL_TIMESTAMP)
>        .rowtime(new Rowtime().timestampsFromField("server_time"))
>     .field("id", Types.String)
>     .field("server_time", Types.String)
>
> when dirty data arrives, such as :
> |  id   |  server_time |
> |  99  | 11.22.33.44  |
>
> My FlinkSQL job fails with exception:
> java.lang.NumberFormatException: For input string: "11.22.33.44"
> at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:580)
> at java.lang.Integer.parseInt(Integer.java:615)
> at
> org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate(DateTimeUtils.java:625)
> at
> org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:715)
> at DataStreamSourceConversion$288.processElement(Unknown Source)
> at
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
>
> Because my flink job use EXACTLY_ONCE, so the job is re-executed from the
> last checkpoint, consumes dirty data again, fails again, and keeps looping
> like this.I would like to ask if there are any good ways to solve this
> situation?
>
> The Flink version I used was flink-1.7.2
>
>
>
>

Reply via email to