Re:Re: Flink 1.9 Blink planner 时间字段问题

2019-09-05 文章 hb
不行, 
Caused by: org.apache.flink.table.api.ValidationException: Rowtime attribute 
'_rowtime' is not of type SQL_TIMESTAMP.



在 2019-09-06 10:48:02,"Jark Wu"  写道:
>可能是因为你在 schema 中把 eventTime 声明成了 timestamp 类型,你可以声明成 long 试试。 
>.field("_rowtime", Types.LONG())
>
>> 在 2019年9月5日,15:11,hb <343122...@163.com> 写道:
>> 
>> 实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么.
>


Re: Flink 1.9 Blink planner 时间字段问题

2019-09-05 文章 Jark Wu
可能是因为你在 schema 中把 eventTime 声明成了 timestamp 类型,你可以声明成 long 试试。 
.field("_rowtime", Types.LONG())

> 在 2019年9月5日,15:11,hb <343122...@163.com> 写道:
> 
> 实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么.



Re:回复: Flink 1.9 Blink planner 时间字段问题

2019-09-05 文章 hb
实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么.




在 2019-09-05 14:06:08,"pengcheng...@bonc.com.cn"  写道:
>FLINK 应该不能把输入的eventTime的long类型转成SQL_TIMESTAMP类型
>
> 
>发件人: hb
>发送时间: 2019-09-05 14:24
>收件人: user-zh
>主题: Flink 1.9 Blink planner 时间字段问题
>代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
>schema
>.field("_rowtime", Types.SQL_TIMESTAMP())
>.rowtime(
>new Rowtime()
>.timestampsFromField("eventTime")
>.watermarksPeriodicBounded(1000))
>kafka输入:  {"eventTime": 10, "id":1,"name":"hb"}  会报错,
> 
>输入  {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确,
>eventTime 字段怎么不支持数值输入呢.
> 
> 
>错误提示:
>```
>Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize 
>JSON object.
>at 
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>at 
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>at java.lang.Thread.run(Thread.java:748)
>Caused by: java.io.IOException: Failed to deserialize JSON object.
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
>at 
>org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>at 
>org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
>at 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>at 
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>at 
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>at 
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>Caused by: java.time.format.DateTimeParseException: Text '10' could not be 
>parsed at index 0
>at 
>java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
>... 7 more
>```
> 
> 
> 
> 
>源码:
>```
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val conf = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  val tEnv = StreamTableEnvironment.create(env, conf)
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
>  val kafkaIn = new Kafka()
>.version("0.11")
>.topic("hbtest111")
>.property("bootstrap.servers", "192.168.1.160:19092")
>.property("group.id", "test2")
> 
> 
>  val json = new Json().deriveSchema()
> 
> 
>  val schema = new Schema()
>.field("id", Types.INT())
>.field("name", Types.STRING())
> 
> 
>  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
>  schema
>.field("_rowtime", Types.SQL_TIMESTAMP())
>.rowtime(
>  new Rowtime()
>.timestampsFromField("eventTime")
>.watermarksPeriodicBounded(1000)
>)
> 
> 
>  
> tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka")
>  val t = tEnv.sqlQuery("select * from table_from_kafka")
>  t.printSchema()
> 
> 
>  t.toRetractStream[Row].print()
>  tEnv.execute("")
>```


回复: Flink 1.9 Blink planner 时间字段问题

2019-09-05 文章 pengcheng...@bonc.com.cn
FLINK 应该不能把输入的eventTime的long类型转成SQL_TIMESTAMP类型

 
发件人: hb
发送时间: 2019-09-05 14:24
收件人: user-zh
主题: Flink 1.9 Blink planner 时间字段问题
代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000))
kafka输入:  {"eventTime": 10, "id":1,"name":"hb"}  会报错,
 
输入  {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确,
eventTime 字段怎么不支持数值输入呢.
 
 
错误提示:
```
Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON 
object.
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize JSON object.
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.time.format.DateTimeParseException: Text '10' could not be 
parsed at index 0
at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more
```
 
 
 
 
源码:
```
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val conf = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv = StreamTableEnvironment.create(env, conf)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
 
  val kafkaIn = new Kafka()
.version("0.11")
.topic("hbtest111")
.property("bootstrap.servers", "192.168.1.160:19092")
.property("group.id", "test2")
 
 
  val json = new Json().deriveSchema()
 
 
  val schema = new Schema()
.field("id", Types.INT())
.field("name", Types.STRING())
 
 
  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
  schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
  new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)
 
 
  
tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka")
  val t = tEnv.sqlQuery("select * from table_from_kafka")
  t.printSchema()
 
 
  t.toRetractStream[Row].print()
  tEnv.execute("")
```


Flink 1.9 Blink planner 时间字段问题

2019-09-04 文章 hb
代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000))
kafka输入:  {"eventTime": 10, "id":1,"name":"hb"}  会报错,

输入  {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确,
eventTime 字段怎么不支持数值输入呢.


错误提示:
```
Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON 
object.
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize JSON object.
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.time.format.DateTimeParseException: Text '10' could not be 
parsed at index 0
at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more
```




源码:
```
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val conf = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv = StreamTableEnvironment.create(env, conf)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  val kafkaIn = new Kafka()
.version("0.11")
.topic("hbtest111")
.property("bootstrap.servers", "192.168.1.160:19092")
.property("group.id", "test2")


  val json = new Json().deriveSchema()


  val schema = new Schema()
.field("id", Types.INT())
.field("name", Types.STRING())


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
  schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
  new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)


  
tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka")
  val t = tEnv.sqlQuery("select * from table_from_kafka")
  t.printSchema()


  t.toRetractStream[Row].print()
  tEnv.execute("")
```