Hi all,

@NiYanchun Thank you for reporting this. Yes I think we could improve
the behaviour of the JSON format.

@Jark First of all I do agree we could/should improve the
"user-friendliness" of the JSON format (and unify the behavior across
text based formats). I am not sure though if it is as simple as just
ignore the time zone here.

My suggestion would be rather to apply the logic of parsing a SQL
timestamp literal (if the expected type is of
LogicalTypeFamily.TIMESTAMP), which would actually also derive the
"stored" type of the timestamp (either WITHOUT TIMEZONE or WITH
TIMEZONE) and then apply a proper sql conversion. Therefore if the

parsed type                 |        requested type            | behaviour

WITHOUT TIMEZONE    |     WITH TIMEZONE             | store the local
timezone with the data

WITHOUT TIMEZONE    |     WITH LOCAL TIMEZONE  | do nothing in the data,
interpret the time in local timezone

WITH TIMEZONE          |     WITH LOCAL TIMEZONE   | convert the
timestamp to local timezone and drop the time zone information

WITH TIMEZONE          |     WITHOUT TIMEZONE       | drop the time zone

It might just boil down to what you said "being more lenient with
regards to parsing the time zone". Nevertheless I think this way it is a
bit better defined behaviour, especially as it has a defined behaviour
when converting between representation with or without time zone.

An implementation note. I think we should aim to base the implementation
on the DataTypes already rather than going back to the TypeInformation.

I would still try to leave the RFC 3339 compatibility mode, but maybe
for that mode it would make sense to not support any types WITHOUT
TIMEZONE? This would be enabled with a switch (disabled by default). As
I understand the RFC, making the time zone mandatory is actually a big
part of the standard as it makes time types unambiguous.

What do you think?

Ps. I cross posted this on the dev ML.



On 26/02/2020 03:45, Jark Wu wrote:
> Yes, I'm also in favor of loosen the datetime format constraint. 
> I guess most of the users don't know there is a JSON standard which
> follows RFC 3339.
> Best,
> Jark
> On Wed, 26 Feb 2020 at 10:06, NiYanchun <niyanc...@outlook.com
> <mailto:niyanc...@outlook.com>> wrote:
>     Yes, these Types definition are general. As a user/developer, I
>     would support “loosen it for usability”. If not, may add
>     some explanation about JSON.
>      Original Message 
>     *Sender:* Jark Wu<imj...@gmail.com <mailto:imj...@gmail.com>>
>     *Recipient:* Outlook<niyanc...@outlook.com
>     <mailto:niyanc...@outlook.com>>; Dawid
>     Wysakowicz<dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
>     *Cc:* godfrey he<godfre...@gmail.com
>     <mailto:godfre...@gmail.com>>; Leonard Xu<xbjt...@gmail.com
>     <mailto:xbjt...@gmail.com>>; user<u...@flink.apache.org
>     <mailto:u...@flink.apache.org>>
>     *Date:* Wednesday, Feb 26, 2020 09:55
>     *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>     Hi Outlook,
>     The explanation in DataTypes is correct, it is compliant to SQL
>     standard. The problem is that JsonRowDeserializationSchema only
>     support  RFC-3339. 
>     On the other hand, CsvRowDeserializationSchema supports to parse
>     "2019-07-09 02:02:00.040".
>     So the question is shall we insist on the RFC-3339 "standard"?
>     Shall we loosen it for usability? 
>     What do you think @Dawid Wysakowicz <mailto:dwysakow...@apache.org> ?
>     Best,
>     Jark
>     On Wed, 26 Feb 2020 at 09:29, Outlook <niyanc...@outlook.com
>     <mailto:niyanc...@outlook.com>> wrote:
>         Thanks Godfrey and Leonard, I tried your answers, result is OK. 
>         BTW, I think if only accept such format for a long time, the
>          TIME and TIMESTAMP methods' doc in
>         `org.apache.flink.table.api.DataTypes` may be better to update,
>         because the document now is not what the method really
>         support. For example, 
>         ```
>         /**
>         * Data type of a time WITHOUT time zone {@code TIME} with no
>         fractional seconds by default.
>         *
>         * <p>An instance consists of {@code hour:minute:second} with
>         up to second precision
>         * and values ranging from {@code 00:00:00} to {@code 23:59:59}.
>         *
>         * <p>Compared to the SQL standard, leap seconds (23:59:60 and
>         23:59:61) are not supported as the
>         * semantics are closer to {@link java.time.LocalTime}. A time
>         WITH time zone is not provided.
>         *
>         * @see #TIME(int)
>         * @see TimeType
>         */
>         public static DataType TIME() {
>         return new AtomicDataType(new TimeType());
>         }```
>         Thanks again.
>          Original Message 
>         *Sender:* Leonard Xu<xbjt...@gmail.com <mailto:xbjt...@gmail.com>>
>         *Recipient:* godfrey he<godfre...@gmail.com
>         <mailto:godfre...@gmail.com>>
>         *Cc:* Outlook<niyanc...@outlook.com
>         <mailto:niyanc...@outlook.com>>; user<u...@flink.apache.org
>         <mailto:u...@flink.apache.org>>
>         *Date:* Tuesday, Feb 25, 2020 22:56
>         *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>         Hi,Outlook
>         Godfrey is right, you should follow the json format[1] when
>         you parse your json message.
>         You can use following code to produce a json data-time String.
>         ```
>         Long time = System.currentTimeMillis(); DateFormat dateFormat =  new 
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); Date date = new Date(time); 
> String jsonSchemaDate = dateFormat.format(date);
>         ```
>         [1] 
> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
>>         在 2020年2月25日,22:15,godfrey he <godfre...@gmail.com
>>         <mailto:godfre...@gmail.com>> 写道:
>>         hi, I find that JsonRowDeserializationSchema only
>>         supports date-time with timezone according to RFC 3339. So
>>         you need add timezone to time data (like 14:02:00Z) and
>>         timestamp data(2019-07-09T02:02:00.040Z). Hope it can help you.
>>         Bests,
>>         godfrey
>>         Outlook <niyanc...@outlook.com
>>         <mailto:niyanc...@outlook.com>> 于2020年2月25日周二
>>         下午5:49写道:
>>             By the way, my flink version is 1.10.0.
>>              Original Message 
>>             *Sender:* Outlook<niyanc...@outlook.com
>>             <mailto:niyanc...@outlook.com>>
>>             *Recipient:* user<u...@flink.apache.org
>>             <mailto:u...@flink.apache.org>>
>>             *Date:* Tuesday, Feb 25, 2020 17:43
>>             *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>             Hi all,
>>             I read json data from kafka, and print to console. When I
>>             do this, some error occurs when
>>             time/timestamp deserialization.
>>             json data in Kafka:
>>             ```
>>             {
>>             "server_date": "2019-07-09",
>>             "server_time": "14:02:00",
>>             "reqsndtime_c": "2019-07-09 02:02:00.040"
>>             }
>>             ```
>>             flink code:
>>             ```
>>             bsTableEnv.connect(
>>             new Kafka()
>>             .version("universal")
>>             .topic("xxx")
>>             .property("bootstrap.servers", "localhost:9092")
>>             .property("zookeeper.connect", "localhost:2181")
>>             .property("group.id <http://group.id/>", "g1")
>>             .startFromEarliest()
>>             ).withFormat(
>>             new Json()
>>             .failOnMissingField(false)
>>             ).withSchema(
>>             new Schema()
>>             .field("server_date", DataTypes.DATE())
>>             .field("server_time", DataTypes.TIME())
>>             .field("reqsndtime_c", DataTypes.TIMESTAMP(3))
>>             ).inAppendMode()
>>             .createTemporaryTable("xxx”);
>>             ```
>>             server_date with format  is ok, but server_time
>>             with  DataTypes.DATE() and reqsndtime_c with
>>             DataTypes.TIMESTAMP(3) cause error.  If I change them to
>>             DataTypes.STRING(), everything will be OK.
>>             Error message:
>>             ```
>>             Exception in thread "main"
>>             java.util.concurrent.ExecutionException:
>>             org.apache.flink.client.program.ProgramInvocationException:
>>             Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>>             at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>             at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>             at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>>             at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>>             at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>             at
>> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>>             at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>>             at cn.com.agree.Main.main(Main.java:122)
>>             Caused by:
>>             org.apache.flink.client.program.ProgramInvocationException:
>>             Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>>             at
>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>             at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>             at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>             at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>             at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>             at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>>             at akka.dispatch.OnComplete.internal(Future.scala:264)
>>             at akka.dispatch.OnComplete.internal(Future.scala:261)
>>             at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>             at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>>             at
>>             scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>             at
>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>             at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>             at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>             at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>             at
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>             at
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>             at
>> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>             at
>> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>             at
>>             scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>             at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>             at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>             at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>             at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>             at
>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>             at
>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>             at
>>             akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>             at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>             at
>>             akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>             at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>             at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>             at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>             Caused by:
>>             org.apache.flink.runtime.client.JobExecutionException:
>>             Job execution failed.
>>             at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>             at
>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>>             ... 31 more
>>             Caused by: org.apache.flink.runtime.JobException:
>>             Recovery is suppressed by NoRestartBackoffTimeStrategy
>>             at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>>             at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>>             at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>>             at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>>             at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>>             at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>>             at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>             Method)
>>             at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>             at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>             at java.lang.reflect.Method.invoke(Method.java:498)
>>             at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>>             at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>>             at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>             at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>             at
>>             akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>             at
>>             akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>             at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>             at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>             at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>             at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>             at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>             at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>             at
>>             akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>             at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>             at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>             at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>             at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>             at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>             ... 4 more
>>             Caused by: java.io.IOException: Failed to deserialize
>>             JSON object.
>>             at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>>             at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>>             at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>>             at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
>>             at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>             at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>             at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>             at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>             Caused by: java.time.format.DateTimeParseException: *Text
>>             '14:02:00' could not be parsed at index 8*
>>             at
>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>>             at
>> java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>>             at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390)
>>             at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>             at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>             at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>             at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>             at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>             ... 7 more
>>             Process finished with exit code 1
>>             ```
>>             reqsndtime_c with DataTypes.TIMESTAMP(3) has similar
>>             exception.  I see the doc,  DataTypes.TIME() value range
>>             is  from {@code 00:00:00} to {@code 23:59:59} ,
>>             DataTypes.TIMESTAMP value range is from {@code 0000-01-01
>>             00:00:00.000000000} to
>>             * {@code 9999-12-31 23:59:59.999999999}.  And my value is
>>             in the range, I don’t know why.  And I see this may be
>>             bug in java 8, I change jdk to 11,   
>>             error still occurs.
>>             Can someone give me some help, thanks in advance.

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to