sno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
|
| |
|
|
ccc0606fight...@163.com
|
| ?? | L Y<531599...@qq.com.INVALID>
->
element.getEvenTime()));
|
| |
|
|
ccc0606fight...@163.com
|
| ?? | L Y<531599...@qq.com.INVALID> |
| | 2023??5??23?? 01:25 |
| ?? | user-zh |
| ???? | ??table apirowtime?? |
HI??
flink1.14
| |
|
|
ccc0606fight...@163.com
|
| ?? | L Y<531599...@qq.com.INVALID> |
| | 2023??5??20?? 01:10 |
| ?? | user-zh |
| | ??table apirowtime?? |
HI??
??mid
ot;funcId"),
$("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"),
$("eventTime").rowtime().as("et"));
//Table tableRequest = tableEnv.fromDataStream(outRequestDataStream,
Schema.newB
各位老师好,以下是我的代码:
| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"),
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"),
$("eventTime").rowtime());
tableEnv.createTemporaryView("
I figured this out. I get this behavior because I was running the code in
a minicluster test that defaulted to batch. I switched to streaming and it
renders "*ROWTIME*".
On Fri, Nov 25, 2022 at 11:51 PM Dan Hill wrote:
> Hi. I copied the Flink code from this page. My print
Hi. I copied the Flink code from this page. My printSchema() does not
contain **ROWTIME** in the output. I'm running Flink v1.14.4.
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/
public static class User {...}
DataStream dataStream
Hey Flinksters,
I was wondering if you had any ideas for how to preserve the rowtime across
an INNER equi join so that the output can be used in a temporal join.
I've attached an example based on the TemporalJoinTest where I'm creating
two views by deduplicating underlying streams (to rates_pk
> 于2021年12月17日周五 18:42写道:
> 请教:
> java.lang.RuntimeException: RowTime field should not be null, please
> convert it to a non-null long value.
>
>
> 说明:源数据表中有一个时间字段:server_end_time,MySQL,有Null值,但源数据不能修改,
> String order_sql = "create TABLE sd_service_order (&q
??
java.lang.RuntimeException: RowTime field should not be null, please convert it
to a non-null long value.
server_end_time??MySQLNull??
String order_sql = "create TABLE sd_service_order (" +
"server_end_t
可以把具体的sql发出来看看
yidan zhao 于2021年11月2日周二 下午7:06写道:
>
> 如题,我原先基于flink1.11和1.12貌似没这个问题。目前基于1.13出现这个问题。
> 问题描述如下:
> 我t1是kafka表,其中有个属性是event_time是row time属性,然后创建了view v1,通过select ,
> event_time from t1这样创建。 现在问题是这么创建之后,我基于v1查询报错说aggre.. window只能在time
> attributes上定义。
>
如题,我原先基于flink1.11和1.12貌似没这个问题。目前基于1.13出现这个问题。
问题描述如下:
我t1是kafka表,其中有个属性是event_time是row time属性,然后创建了view v1,通过select ,
event_time from t1这样创建。 现在问题是这么创建之后,我基于v1查询报错说aggre.. window只能在time
attributes上定义。
不清楚是版本变化导致,还是我其他地方搞错了呢。
Recipients:user
Subject:Define rowtime on intermediate table field
Hi,
My use case involves reading raw data records from Kafka and processing them.
The records are coming from a database, where a periodic job reads new rows,
packages them into a single JSON object (as described below) and writes
Hi,
My use case involves reading raw data records from Kafka and processing
them. The records are coming from a database, where a periodic job reads
new rows, packages them into a single JSON object (as described below) and
writes the entire record to Kafka.
{
'id': 'some_id',
'key_a':
我也遇到了同样的问题,请问最后是怎么解决的?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我跟你使用的方法一样,也是加工数据源创建临时view然后传递到sink,其中用到了rowtime,遇到和你同样的错,请问是怎么解决的最后
--
Sent from: http://apache-flink.147419.n8.nabble.com/
??
??
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv);
DataStream
>>Is it possible that you
are generating to many watermarks that need to be send to all downstream
tasks?
This was it basically. I had unexpected flooding on specific keys, which
was guessing intermittently hot partitions that was back pressuring the
rowtime task.
I do have another questio
Hi Aeden,
the rowtime task is actually just a simple map function that extracts
the event-time timestamp into a field of the row for the next operator.
It should not be the problem. Can you share a screenshot of your
pipeline? What is your watermarking strategy? Is it possible that you
I have a job made up of a few FlinkSQL statements using a
statement set. In my job graph viewed through the Flink UI a few of
the tasks/statements are preceded by this task
rowtime field: (#11: event_time TIME ATTRIBUTE(ROWTIME))
that has an upstream Kafka source/sink task.
Occasionally
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#event-time
On Sun, Feb 21, 2021 at 8:43 AM Aeden Jameson
wrote:
> In my job graph viewed through the Flink UI I see a task named,
>
> rowtime field: (#11: event_time TIME ATTRIBUT
In my job graph viewed through the Flink UI I see a task named,
rowtime field: (#11: event_time TIME ATTRIBUTE(ROWTIME))
that has an upstream Kafka source task. What exactly does the rowtime task do?
--
Thank you,
Aeden
Hi,Jiazhi
> When DataStream is converted to table, eventTime is converted to
> rowTime. Rowtime is 8 hours slow. How to solve this problem?
The reason is that the only data type that used to define an event time in
Table/SQL is TIMESTAMP(3), and TIMESTAMP type isn’t related t
ble时候,对应的eventTime转成rowtime,rowtime慢了8个小时,怎么解决?
> 谢谢
> 嘉治
Hi ??
DataStreamtableeventTimerowtime??rowtime8??
Hi all
When DataStream is converted to table, eventTime is
converted to rowTime. Rowtime is 8 hours slow. How to solve this problem?
Thanks
Jiazhi
Hi,
可以试下 CAST(eventTime AS TIMESTAMP)
Best,
Hailong
在 2020-12-19 11:14:53,"ゞ野蠻遊戲χ" 写道:
>大家好!
>
>当我把DataStream流转成Table,并且指定了rowtime,然后使用带有udtf的sql传入tableEnv.sql(),抛出如下错误:Rowtime
> attributes must not be in the input rows of a regular join. As a workaround
??DataStream??Tablerowtime??udtf??sqltableEnv.sql()Rowtime
attributes must not be in the input rows of a regular join. As a workaround
you can cast the time attributes of input tables to TIMESTAMP
before
Hi Dan,
are you intending to use interval joins, regular joins, or a mixture of
both?
For regular joins you must ensure to cast a rowtime attribute to
timestamp as early as possible. For interval joins, you need to make
sure that the rowtime attribute is unmodified.
Currently, I see
NTERVAL HOUR)))], joinType=[inner])
: :- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_insertion]])
: +- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_impression]])
+- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_click]])
Rowtime att
> "user-zh"
>
><18868816...@163.com;
>发送时间:2020年11月25日(星期三) 晚上7:31
>收件人:"user-zh"
>主题:Re:flink 1.11.2 rowtime和proctime流 In
??
??join
----
??:
"user-zh"
" l_a INT, " +
>" l_b string, " +
>" l_rt timestamp(3), " +
> " r_a INT, " +
>" r_b string, " +
>" r_pt timestamp(3) " +
>
" l_b string, " +
" l_rt timestamp(3), " +
" r_a INT, " +
" r_b string, " +
" r_pt timestamp(3) " +
") WITH ( " +
" 'connector' = 'print' " +
ble/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java#L253
>>
>>
>>Best,
>>Hailong
>>
>>
>>
>>
>>在 2020-11-16 13:48:35,"周虓岗" 写道:
>>
>>通过table api的//
apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java#L253
>
>
>Best,
>Hailong
>
>
>
>
>在 2020-11-16 13:48:35,"周虓岗" 写道:
>
>通过table api的// declare an additional logical field as an event time attribute
>
>Tabletable=tEnv.fromDataSt
s an event time attribute
Tabletable=tEnv.fromDataStream(stream,$("user_name"),$("data"),$("user_action_time").rowtime()");
可以把eventtime往后传,
如果使用createview的话怎么把这个time attribute往后带吗?
不往后传的话可能会
这个有什么方法吗?
通过table api的// declare an additional logical field as an event time attribute
Tabletable=tEnv.fromDataStream(stream,$("user_name"),$("data"),$("user_action_time").rowtime()");
可以把eventtime往后传,
如果使用createview的话怎么把这个time attribute往后带吗?
不往后传的话可能会
这个有什么方法吗?
1. 只支持指定顶层字段作为 rowtime,如果要使用 nested field 作为 rowtime,可以先使用计算列(仅在 DDL
上支持)生成顶层列。
2. Descriptor API 有很多问题,且缺失很多功能,不建议使用,建议使用 DDL。 Descriptor API 将在1.12 版本中重构。
Best,
Jark
On Thu, 17 Sep 2020 at 10:41, kylin wrote:
> flink版本1.7.2
>
> flink table api从kafka读取json数据,JsonSchema如下图所示
> 发
flink版本1.7.2
flink table api从kafka读取json数据,JsonSchema如下图所示
发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定?
tableEnv.connect(
new Kafka()
.version("0.10")
.topic(topic_in)
.property("bootstrap.servers", brokers)
.property("group.id", &quo
/r62b47ec6812ddbafed65ac79e31ca0305099893559f1e5a991dee550%40%3Cdev.flink.apache.org%3E
On 01.09.20 22:55, Satyam Shekhar wrote:
Thanks for your replies Matthias and Timo.
Converting the Table to DataStream, assigning a new Watermark & Rowtime
attribute, and converting back to Table makes sense. One challenge with
that appr
Thanks for your replies Matthias and Timo.
Converting the Table to DataStream, assigning a new Watermark & Rowtime
attribute, and converting back to Table makes sense. One challenge with
that approach is that Table to DataStream conversion could emit retractable
data stream, however, I t
Hello,
I use Flink for continuous evaluation of SQL queries on streaming data. One
of the use cases requires us to run recursive SQL queries. I am unable to
find a way to edit rowtime time attribute of the intermediate result table.
For example, let's assume that there is a table T0 with schema
非常感谢,按照您给出的jira,我修改源码后好用了。
--
发件人:Benchao Li
发送时间:2020年8月19日(星期三) 19:48
收件人:user-zh ; 郑斌斌
主 题:Re: flink 1.11 order by rowtime报错
Hi 斌斌,
感觉你应该是遇到了一个已知的bug[1]
[1] https://issues.apache.org/jira/browse/FLINK-16827
郑斌斌 于2020年8月19日
> 收件人:user-zh
> 主 题:Re: flink 1.11 order by rowtime报错
>
> 错误呢?没看到。把代码贴出来看一下,是不是processtime没有设置或者设置不对
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
--
Best,
Benchao Li
)
--
发件人:china_tao
发送时间:2020年8月19日(星期三) 00:17
收件人:user-zh
主 题:Re: flink 1.11 order by rowtime报错
错误呢?没看到。把代码贴出来看一下,是不是processtime没有设置或者设置不对
--
Sent from: http://apache-flink.147419.n8.nabble.com/
错误呢?没看到。把代码贴出来看一下,是不是processtime没有设置或者设置不对
--
Sent from: http://apache-flink.147419.n8.nabble.com/
没有看到错误,把代码贴出来把,是不是eventtime没有设置或者设置不正确
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi:
你的图挂了,建议直接贴代码,图片大家是看不到的
发件人: 郑斌斌
发送时间: 2020-08-18 17:45
收件人: user-zh
主题: flink 1.11 order by rowtime报错
小伙伴们 :
大家好,请教个问题,执行order by SQL文时,为什么报下面的错误:
SQL文:select order_id,product_id FROM kafka_order order by rowtime
Thanks & Regards
小伙伴们 :
大家好,请教个问题,执行order by SQL文时,为什么报下面的错误:
SQL文:select order_id,product_id FROM kafka_order order by rowtime
Thanks & Regards
Hi Jesse,
I think that the type of rowtime you declared on the source schema is
DataTypes.Timestamp(), you also use DataTypes.Timestamp() on the sink schema
Best,
Xingbo
Jesse Lord 于2020年7月15日周三 下午11:41写道:
> I am trying to sink the rowtime field in pyflink 1.10. I get the following
>
I am trying to sink the rowtime field in pyflink 1.10. I get the following error
For the source schema I use
.field("rowtime", DataTypes.TIMESTAMP(2))
.rowtime(
Rowtime()
.timestamps_from_field("timestamp")
.watermark
好吧,感谢回答
--
Sent from: http://apache-flink.147419.n8.nabble.com/
这可能是 connect API 的某个 bug 吧。 建议先用 DDL 。
Best,
Jark
On Tue, 14 Jul 2020 at 08:54, Hito Zhu wrote:
> rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。
> Rowtime rowtime = new Rowtime()
> .timestampsFromField("searchTime")
> .water
rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。
Rowtime rowtime = new Rowtime()
.timestampsFromField("searchTime")
.watermarksPeriodicBounded(5 * 1000);
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你的源码中 new
Schema().field("searchTime",DataTypes.TIMESTAMP()).rowtime(rowtime);
里面的 rowtime 的定义能贴下吗?
On Mon, 13 Jul 2020 at 20:53, Hito Zhu wrote:
> Hi Jark 异常信息如下:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
&
Hi Jark 异常信息如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field null does not exist
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$mapToResolvedField$4(TimestampExtractorUtils.java:85)
at
程序包 Field null does not exist 错误,是我用法有问题?
> 看了下 https://issues.apache.org/jira/browse/FLINK-16160
> <https://issues.apache.org/jira/browse/FLINK-16160> 这个 issue 是解决的这个问题吗?
>
> tableEnv.connect(kafka)
> .withSchema(
> new Schema().field("searchTime"
Schema(
new Schema().field("searchTime",
DataTypes.TIMESTAMP()).rowtime(rowtime);
)
.withFormat(
new Json().failOnMissingField(false)
)
.createTemporaryTable("tablename");
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Schema(
new Schema().field("searchTime",
DataTypes.TIMESTAMP()).rowtime(rowtime);
)
.withFormat(
new Json().failOnMissingField(false)
)
.createTemporaryTable("tablename");
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
> field("logictime","TIMESTAMP(3)”)
报错的原因这个字段在你原始的表中不存在的,理解你的需求是你想用 field evitime(Long型)生成一个新的 field
logictime(TIMESTAMP(3)),这个可以用计算列解决,Table API上还不支持计算列,1.12 已经在开发中了。你可以用 DDL
加计算列完成满足你的需求,参考[1]
create table test (
acct STRING,
evitime BIGINT,
logictime as
alse).deriveSchema())
.withSchema(new
Schema().field("acct", "STRING").field("evtime",
"LONG").field("logictime","TIMESTAMP(3)").rowTime(new
Rowtime().timestampsFromField("evtime").wa
ka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>at akka.japi.pf <http://akka.japi.pf/ <http://akka.japi.pf/>
>>> .UnitCaseStatement.apply(CaseStatements.scala:26)
>>>at akka.japi.pf <http://akka.japi.pf/ <http://akka.japi.pf/>
>>> .UnitCaseStatement.appl
pi.pf/
> >.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> > at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > at
> scala.PartialFunction$
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:22
=[SeqNo, Type, Table, ServerId,
Database, OldData, GTID, Data, Timestamp, Offset,
from_unixtime((Data.FuiUpdateTime / 1000)) AS FuiUpdateTimeSec,
(from_unixtime((Data.FuiUpdateTime / 1000)) TO_TIMESTAMP _UTF-16LE'-MM-dd
HH:mm:ss') AS event_ts]) - WatermarkAssigner(rowtime=[event_ts
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at SinkConversion$51.processElement(Unknown Source)
> ……
>
>
>
> 最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。
> 请问这个问题可以避免吗?
--
Best, Jingsong Lee
)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at SinkConversion$51.processElement(Unknown Source)
……
最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。
请问这个问题可以避免吗?
Hi 猫猫:
在DDL上定义rowtime是刚刚支持的功能,文档正在编写中。[1]
你可以通过master的代码来试用,社区正在准备发布1.10,到时候会有release版本可用。
[2] 中有使用的完整例子,FYI。
[1] https://issues.apache.org/jira/browse/FLINK-14320
[2]
https://github.com/apache/flink/blob/2ecf7cacbe742099d78c528de962fccf13c14629/flink-table/flink-table-planner-blink/src/test
??tableEnv.sqlUpdate(ddl);??
rowtimerowtime??
??flink???
??csvkafka
sql??
CREATE
ableName)
> .startFromEarliest()
> .property("zookeeper.connect", *“*xxx")
> .property("bootstrap.servers", *“*xxx")
> .property("group.id", *“*xxx"))
> .withFormat(new Json().deriveSche
“xxx")
.property("group.id", “xxx"))
.withFormat(new Json().deriveSchema())
.withSchema(new Schema()
.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowti
各位好,我想使用kafka消息中的某个字段作为rowtime属性,遇到了以下问题,使用flink版本为1.9.1。
以下是我尝试的两种用法,都会报错。请问大家有没有遇到过类似的问题,怎么解决的,谢谢!
代码一:
tEnv.connect(
new Kafka()
.version("universal")
.topic("flink-test-dept-1")
.startFromGroupOffsets()
Hi Zijie,
应该是你的 sqlTimestamp 字段中有 null 的数据,在去取 ts 的时候报 NPE 了。
目前 watermark assigner 要求每条数据的 ts 都是有值的。
Best,
Jark
> 在 2019年10月17日,20:25,Zijie Lu 写道:
>
> CREATE TABLE requests(
> `rowtime` TIMESTAMP,
> `requestId` VARCHAR,
> `algoExtent` ROW(`mAdId` VARCHAR))
> with
CREATE TABLE requests(
`rowtime` TIMESTAMP,
`requestId` VARCHAR,
`algoExtent` ROW(`mAdId` VARCHAR))
with (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test_request',
'connector.startup-mode' = 'latest-offset',
'connector.properties.0.key
而这个定义在old planner里是可以用的
On Thu, 17 Oct 2019 at 19:49, Zijie Lu wrote:
> 我使用blink planner来定义了下面的表
> CREATE TABLE requests(
> `rowtime` TIMESTAMP,
> `requestId` VARCHAR,
> `algoExtent` ROW(`mAdId` VARCHAR))
> with (
> 'connector.type' = 'kafka',
> 'connect
我使用blink planner来定义了下面的表
CREATE TABLE requests(
`rowtime` TIMESTAMP,
`requestId` VARCHAR,
`algoExtent` ROW(`mAdId` VARCHAR))
with (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test_request',
'connector.startup-mode' = 'latest-offset
ds there you can still access nested fields with
>>> something like "topfield.x.y.z" in the SQL statement.
>>>
>>> What I found is that the easiest way to make this all work is to ensure
>>> the rowtime field in the structure is at the top
elds there you can still access nested fields with
>> something like "topfield.x.y.z" in the SQL statement.
>>
>> What I found is that the easiest way to make this all work is to ensure
>> the rowtime field in the structure is at the top level (which makes
y needs
> the 'top level' fields of the Avro datastructure.
> With only the top fields there you can still access nested fields with
> something like "topfield.x.y.z" in the SQL statement.
>
> What I found is that the easiest way to make this all work is to ensure
> the rowti
??KafkaTableSource??rowtime
select order_id ,last_value(timestamp) timestamp,last_value(order_status)
order_status from order group by order_id
??over window ??
over(partition by df(timestamp,'-MM-dd 00:00:00') order by update_time
range BETWEEN INTERVAL '24
fields there you can still access nested fields with
something like "topfield.x.y.z" in the SQL statement.
What I found is that the easiest way to make this all work is to ensure the
rowtime field in the structure is at the top level (which makes sense in
general) and generate the fie
Hi Niels,
if you are coming from DataStream API, all you need to do is to write a
timestamp extractor.
When you call:
tableEnv.registerDataStream("TestStream", letterStream,
"EventTime.rowtime, letter, counter");
The ".rowtime" means that the framework
Hi,
Experimenting with the StreamTableEnvironment I build something like this:
DataStream> letterStream = ...
tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime,
letter, counter");
Because the "EventTime" was tagged with ".rowtim
t(
new Csv().schema(Types.ROW(names, types))
)
.withSchema(
new Schema()
.field("timestamp", Types.SQL_TIMESTAMP())
.rowtime(new Rowtime()
在执行 SQL 的时候遇见如下情况:
streamTableEnvironment.registerDataStream()注册的字段有一个是 over_time.rowtime
使用 sql:select tumble(over_time,interval '1' second),over_time from kafka_source
group by over_time,tumble(over_time,interval '1' second)
会报错。ClassCastException
如果 将 group by 后面的子句交换位置, group by
在执行 SQL 的时候遇见如下情况:
streamTableEnvironment.registerDataStream()注册的字段有一个是 over_time.rowtime
使用 sql:select tumble(over_time,interval '1' second),over_time from kafka_source
group by over_time,tumble(over_time,interval '1' second)
会报错。ClassCastException
如果 将 group by 后面的子句交换位置, group by
|
> | 2 | 2019-05-15 10:00:00 |
> ...
>
> and I define rowtime from the server_time field:
> new Schema()
> .field("rowtime", Types.SQL_TIMESTAMP)
>.rowtime(new Rowtime().timestampsFromField("server_time"))
> .field("id"
I use FlinkSQL to process Kafka data in the following format:
| id | server_time |
| 1 | 2019-05-15 10:00:00 |
| 2 | 2019-05-15 10:00:00 |
...
and I define rowtime from the server_time field:
new Schema()
.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(n
费掉,即便重启程序-也要发送新的数据,才会消费上次"未及时"消费的数据,而不是自动从上一次的offset+1开始。
SQL:
SELECT astyle, TUMBLE_START(rowtime, INTERVAL '10' SECOND) time_start,
TUMBLE_END(rowtime, INTERVAL '10' SECOND) time_end, SUM(energy) AS sum_energy,
CAST(COUNT(aid) AS INT) AS cnt, CAST(AVG(age) AS INT) AS avg_age FROM t_p
ECT astyle, TUMBLE_START(rowtime, INTERVAL '10' SECOND) time_start,
TUMBLE_END(rowtime, INTERVAL '10' SECOND) time_end, SUM(energy) AS sum_energy,
CAST(COUNT(aid) AS INT) AS cnt, CAST(AVG(age) AS INT) AS avg_age FROM t_pojo
GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), astyle
assignTimestampsAndWater
correct me if I am wrong.
>
> Best,
>
> Dawid
>
>
> On 04/10/18 15:08, Johannes Schulte wrote:
>
> Hi,
>
> when converting a DataStream (with Watermarks) to a table like
> described here
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/
able like
> described here
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time
>
> I wonder on how to use the rowtime in a following window operation
> _without_ explicitly specifying all field names and hence rely on case
> class type
Hi,
when converting a DataStream (with Watermarks) to a table like described
here
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time
I wonder on how to use the rowtime in a following window operation
_without_ explicitly specifying all field names
ble/sqlClient.html>,
we are trying to define rowTime from a nested JSON element,
but struggling with syntax.
JSON data format: https://pastebin.com/ByCLhEnF
YML table config: https://pastebin.com/cgEtQPDQ
Now, in above config, we want to access
*pay
um 14:25 schrieb Ashwin Sinha:
>>
>> Hi Users,
>>
>> In Flink1.5 SQL CLient
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sqlClient.html>,
>> we are trying to define rowTime from a nested JSON element, but struggling
>> wit
>
> Am 16.07.18 um 14:25 schrieb Ashwin Sinha:
>
> Hi Users,
>
> In Flink1.5 SQL CLient
> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sqlClient.html>,
> we are trying to define rowTime from a nested JSON element, but struggling
> with synta
windows?
Regards,
Timo
Am 16.07.18 um 14:25 schrieb Ashwin Sinha:
Hi Users,
In Flink1.5 SQL CLient
<https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sqlClient.html>,
we are trying to define rowTime from a nested JSON element, but
struggling with syntax.
JSON data
Hi Users,
In Flink1.5 SQL CLient
<https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sqlClient.html>,
we are trying to define rowTime from a nested JSON element, but struggling
with syntax.
JSON data format: https://pastebin.com/ByCLhEnF
YML table config: https://pasteb
(*), MAX(rowtime) FROM t GROUP BY user;
- SELECT user, COUNT(*), LAST_VAL(rowtime) FROM t GROUP BY user;
These queries would forward the maximum (or last) event-time timestamp
(rowtime) to the result table.
However, none of these work in the current version or upcoming version of
Flink.
We also need
1 - 100 of 111 matches
Mail list logo