Hi Dawid,

I agree with you. If we want to loosen the format constraint, the
important piece is the conversion matrix.

The conversion matrix you listed makes sense to me. From my understanding,
there should be 6 combination.
We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH
TIMEZONE to make the matrix complete.
When the community reach an agreement on this, we should write it down on
the documentation and follow the matrix in all text-based formats.

Regarding to the RFC 3339 compatibility mode switch, it also sounds good to
me.

Best,
Jark

On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi all,
>
> @NiYanchun Thank you for reporting this. Yes I think we could improve the
> behaviour of the JSON format.
>
> @Jark First of all I do agree we could/should improve the
> "user-friendliness" of the JSON format (and unify the behavior across text
> based formats). I am not sure though if it is as simple as just ignore the
> time zone here.
>
> My suggestion would be rather to apply the logic of parsing a SQL
> timestamp literal (if the expected type is of LogicalTypeFamily.TIMESTAMP),
> which would actually also derive the "stored" type of the timestamp (either
> WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql conversion.
> Therefore if the
>
> parsed type                 |        requested type            | behaviour
>
> WITHOUT TIMEZONE    |     WITH TIMEZONE             | store the local
> timezone with the data
>
> WITHOUT TIMEZONE    |     WITH LOCAL TIMEZONE  | do nothing in the data,
> interpret the time in local timezone
>
> WITH TIMEZONE          |     WITH LOCAL TIMEZONE   | convert the timestamp
> to local timezone and drop the time zone information
>
> WITH TIMEZONE          |     WITHOUT TIMEZONE       | drop the time zone
> information
>
> It might just boil down to what you said "being more lenient with regards
> to parsing the time zone". Nevertheless I think this way it is a bit better
> defined behaviour, especially as it has a defined behaviour when converting
> between representation with or without time zone.
>
> An implementation note. I think we should aim to base the implementation
> on the DataTypes already rather than going back to the TypeInformation.
>
> I would still try to leave the RFC 3339 compatibility mode, but maybe for
> that mode it would make sense to not support any types WITHOUT TIMEZONE?
> This would be enabled with a switch (disabled by default). As I understand
> the RFC, making the time zone mandatory is actually a big part of the
> standard as it makes time types unambiguous.
>
> What do you think?
>
> Ps. I cross posted this on the dev ML.
>
> Best,
>
> Dawid
>
>
> On 26/02/2020 03:45, Jark Wu wrote:
>
> Yes, I'm also in favor of loosen the datetime format constraint.
> I guess most of the users don't know there is a JSON standard which
> follows RFC 3339.
>
> Best,
> Jark
>
> On Wed, 26 Feb 2020 at 10:06, NiYanchun <niyanc...@outlook.com> wrote:
>
>> Yes, these Types definition are general. As a user/developer, I would
>> support “loosen it for usability”. If not, may add some explanation
>> about JSON.
>>
>>
>>
>>  Original Message
>> *Sender:* Jark Wu<imj...@gmail.com>
>> *Recipient:* Outlook<niyanc...@outlook.com>; Dawid Wysakowicz<
>> dwysakow...@apache.org>
>> *Cc:* godfrey he<godfre...@gmail.com>; Leonard Xu<xbjt...@gmail.com>;
>> user<u...@flink.apache.org>
>> *Date:* Wednesday, Feb 26, 2020 09:55
>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>
>> Hi Outlook,
>>
>> The explanation in DataTypes is correct, it is compliant to SQL standard.
>> The problem is that JsonRowDeserializationSchema only support  RFC-3339.
>> On the other hand, CsvRowDeserializationSchema supports to parse
>> "2019-07-09 02:02:00.040".
>>
>> So the question is shall we insist on the RFC-3339 "standard"? Shall we
>> loosen it for usability?
>> What do you think @Dawid Wysakowicz <dwysakow...@apache.org> ?
>>
>> Best,
>> Jark
>>
>> On Wed, 26 Feb 2020 at 09:29, Outlook <niyanc...@outlook.com> wrote:
>>
>>> Thanks Godfrey and Leonard, I tried your answers, result is OK.
>>>
>>>
>>> BTW, I think if only accept such format for a long time, the  TIME and
>>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may be
>>> better to update,
>>>
>>> because the document now is not what the method really support. For
>>> example,
>>>
>>>
>>> ```
>>> /**
>>> * Data type of a time WITHOUT time zone {@code TIME} with no fractional
>>> seconds by default.
>>> *
>>> * <p>An instance consists of {@code hour:minute:second} with up to
>>> second precision
>>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}.
>>> *
>>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61)
>>> are not supported as the
>>> * semantics are closer to {@link java.time.LocalTime}. A time WITH time
>>> zone is not provided.
>>> *
>>> * @see #TIME(int)
>>> * @see TimeType
>>> */
>>> public static DataType TIME() {
>>> return new AtomicDataType(new TimeType());
>>>
>>> }```
>>>
>>>
>>> Thanks again.
>>>
>>>  Original Message
>>> *Sender:* Leonard Xu<xbjt...@gmail.com>
>>> *Recipient:* godfrey he<godfre...@gmail.com>
>>> *Cc:* Outlook<niyanc...@outlook.com>; user<u...@flink.apache.org>
>>> *Date:* Tuesday, Feb 25, 2020 22:56
>>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>>
>>> Hi,Outlook
>>> Godfrey is right, you should follow the json format[1] when you parse
>>> your json message.
>>> You can use following code to produce a json data-time String.
>>> ```
>>>
>>> Long time = System.currentTimeMillis();DateFormat dateFormat =  new 
>>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new 
>>> Date(time);String jsonSchemaDate = dateFormat.format(date);
>>>
>>> ```
>>> [1]
>>> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
>>>
>>> 在 2020年2月25日,22:15,godfrey he <godfre...@gmail.com> 写道:
>>>
>>> hi, I find that JsonRowDeserializationSchema only supports date-time
>>> with timezone according to RFC 3339. So you need add timezone to time data
>>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope it can
>>> help you.
>>>
>>> Bests,
>>> godfrey
>>>
>>> Outlook <niyanc...@outlook.com> 于2020年2月25日周二 下午5:49写道:
>>>
>>>> By the way, my flink version is 1.10.0.
>>>>
>>>>  Original Message
>>>> *Sender:* Outlook<niyanc...@outlook.com>
>>>> *Recipient:* user<u...@flink.apache.org>
>>>> *Date:* Tuesday, Feb 25, 2020 17:43
>>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>>>
>>>> Hi all,
>>>>
>>>> I read json data from kafka, and print to console. When I do this, some
>>>> error occurs when time/timestamp deserialization.
>>>>
>>>> json data in Kafka:
>>>>
>>>> ```
>>>> {
>>>> "server_date": "2019-07-09",
>>>> "server_time": "14:02:00",
>>>> "reqsndtime_c": "2019-07-09 02:02:00.040"
>>>> }
>>>> ```
>>>>
>>>> flink code:
>>>>
>>>> ```
>>>> bsTableEnv.connect(
>>>> new Kafka()
>>>> .version("universal")
>>>> .topic("xxx")
>>>> .property("bootstrap.servers", "localhost:9092")
>>>> .property("zookeeper.connect", "localhost:2181")
>>>> .property("group.id", "g1")
>>>> .startFromEarliest()
>>>> ).withFormat(
>>>> new Json()
>>>> .failOnMissingField(false)
>>>> ).withSchema(
>>>> new Schema()
>>>> .field("server_date", DataTypes.DATE())
>>>> .field("server_time", DataTypes.TIME())
>>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3))
>>>> ).inAppendMode()
>>>> .createTemporaryTable("xxx”);
>>>> ```
>>>>
>>>>
>>>> server_date with format  is ok, but server_time with  DataTypes.DATE()
>>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error.  If I change them
>>>> to DataTypes.STRING(), everything will be OK.
>>>>
>>>> Error message:
>>>> ```
>>>> Exception in thread "main" java.util.concurrent.ExecutionException:
>>>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>>>> at
>>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>> at
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>>>> at
>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>>> at
>>>> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>>>> at cn.com.agree.Main.main(Main.java:122)
>>>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>>>> at
>>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>>> at
>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>> at
>>>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>>> at
>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at
>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>>> at
>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>> at
>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>>> at
>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>>> at
>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at
>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at
>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>>> at
>>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>>>> ... 31 more
>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>>>> suppressed by NoRestartBackoffTimeStrategy
>>>> at
>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>>>> at
>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>>>> at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>>>> at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>>>> at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>>>> at
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>> ... 4 more
>>>> Caused by: java.io.IOException: Failed to deserialize JSON object.
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>>> Caused by: java.time.format.DateTimeParseException: *Text '14:02:00'
>>>> could not be parsed at index 8*
>>>> at
>>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>>>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>>> ... 7 more
>>>>
>>>> Process finished with exit code 1
>>>> ```
>>>>
>>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception.  I see
>>>> the doc,  DataTypes.TIME() value range is  from {@code 00:00:00} to {@code
>>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code 0000-01-01
>>>> 00:00:00.000000000} to
>>>> * {@code 9999-12-31 23:59:59.999999999}.  And my value is in the range,
>>>> I don’t know why.  And I see this may be bug in java 8, I change jdk to 11,
>>>>
>>>> error still occurs.
>>>>
>>>> Can someone give me some help, thanks in advance.
>>>>
>>>
>>>

Reply via email to