答复: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

2021-03-15 Thread
Hi Shengkai,


感谢回复


让我理解一下:

   在ChangelogNormalize中

  1.  Rowkind是未生效的

  2.  null表达墓碑

  3.  保存全量数据的overhead


 如果我的理解是对的,那么假设遇到了不论是+u i,还是-u,d 都会被理解为是一次insert,从而促使下游emit record?

我们现在有若干自定义的Format,如果为了适配Upsert Kafka,format需要对d,(-u) 事件发射value == null的Record吗





发件人: Shengkai Fang 
发送时间: 2021年3月15日 14:21:31
收件人: user-zh@flink.apache.org
主题: Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

Hi.

当初的设计是基于kafka的compacted topic设计的,而compacted
topic有自身的表达changelog的语法,例如:使用value 为 null 表示tombstone
message。从这个角度出发的话,我们仅从kafka的角度去理解数据,而非从format的角度去解析数据。

这当然引入了一些问题,例如当利用upsert-kafka读取数据的时候需要维护一个state以记住读取的所有的key。

Best,
Shengkai

刘首维  于2021年3月15日周一 上午11:48写道:

> Hi all,
>
>
>
> 最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode
> 必须是insert-only的,请问这是什么原因呢。
>
> 如果不是的话,请直接指正我,谢谢。
>
>
>
>
>
> Flink version 1.12.1
>


Upsert Kafka 的format 为什么要求是INSERT-ONLY的

2021-03-14 Thread
Hi all,



最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode 
必须是insert-only的,请问这是什么原因呢。

如果不是的话,请直接指正我,谢谢。





Flink version 1.12.1


答复: Flink 1.11里如何parse出未解析的执行计划

2020-10-23 Thread
Hi,


我的做法如我所说,是用反射将parser拿出来的,比较hack但是很简单而且很稳妥


代码差不多就是下面这个样子


Flink version: custom version base on 1.11.x


@PostConstruct

private void setup() throws NoSuchFieldException, IllegalAccessException {
final StreamTableEnvironmentImpl env = (StreamTableEnvironmentImpl) 
support.getStreamTableEnvironment();
final Field field = 
env.getParser().getClass().getDeclaredField("calciteParserSupplier");
field.setAccessible(true);
// 普通的parser
final Supplier defaultSupplier = 
(Supplier) field.get(env.getParser());
this.defaultSupplier = defaultSupplier;
env.getConfig().setSqlDialect(SqlDialect.HIVE);
final Field field2 = 
env.getParser().getClass().getDeclaredField("calciteParserSupplier");
field2.setAccessible(true);
// hive的parser
final Supplier hiveSupplier = (Supplier) 
field.get(env.getParser());
this.hiveSupplier = hiveSupplier;
}

WARN:
这种做法带来的比较不好的点是,项目的依赖会比较多。我这边恰好是一个在一个单独的且包含Flink依赖的服务中做的这件事从而规避了这个问题。
如果要参考上面这种做法的话,注意评估依赖的问题啦~





发件人: silence 
发送时间: 2020年10月22日 12:05:38
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.11里如何parse出未解析的执行计划

我简单写了一下仅供参考

import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;

/**
 * @author: silence
 * @date: 2020/10/22
 */
public class Test {
public static void main(String[] args) throws SqlParseException {
String sql = "xxx";
SqlParser.Config sqlParserConfig = SqlParser
.configBuilder()
.setParserFactory(FlinkSqlParserImpl.FACTORY)
.setConformance(FlinkSqlConformance.DEFAULT)
.setLex(Lex.JAVA)
.setIdentifierMaxLength(256)
.build();
SqlParser sqlParser = SqlParser.create(sql, sqlParserConfig);
SqlNodeList sqlNodes = sqlParser.parseStmtList();
for (SqlNode sqlNode : sqlNodes) {
//do something
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


答复: Flink 1.11里如何parse出未解析的执行计划

2020-10-19 Thread
Hi,



 我之前跟你有相同的需求,实现方式也跟你的思路基本类似, mock一个env 然后反射获取calciteParserSupplier


目前在生产环境运行良好

FYI


发件人: 马阳阳 
发送时间: 2020年10月19日 17:57:47
收件人: Flink中文邮件列表
主题: Flink 1.11里如何parse出未解析的执行计划

Flink 
1.11里的org.apache.flink.table.planner.ParserImpl的parse方法里包含了对Planner相关方法的调用,这导致在某些前置sql(例如insert
 into用到的表的create table语句)没有执行之前,这个parse方法会报错。如果只是想调用Calcite的相关的功能去parse 
sql语句,有什么办法可以做到吗?能想到的一个办法是通过反射拿到ParserImpl里面的calciteParserSupplier。想知道Flink有没有提供直接的接口或者方法去做纯的sql
 parsing。


谢谢~


Flink SQL 1.11.1 executeSql/SqlUpdate时 SQL validation的一些问题

2020-09-25 Thread
Hi all,


 今天在调试1.11 Flink 代码的时候,发现一个没太理解的现象


  考虑以下code




  bsTableEnv.executeSql("create database a")
bsTableEnv.executeSql( " CREATE TABLE  a.b "(后略))
bsTableEnv.executeSql("select * from a.b")


然后发现了以下现象:
[cid:5272e061-1d69-4e6a-b23b-fe09be09ade4]

从图中可以得知,在`DatabaseCalciteSchema` 中
我发现下面几个奇怪的点

  1.   databaseName 是 ‘'default'
  2.   getTable将 `a`作为参数传入,而不是b (a是库名,b是表名)


首先可以确定的是这个发生在validation阶段

其次我发现特意针对这块做了一次catch `TableNotExistException`的操作

请问这部分代码的用途和目的是?





答复: Flink-1.11.1 Kafka Table API BigInt 问题

2020-09-22 Thread
Hi,

试一下java的BigInteger呢


发件人: nashcen <2415370...@qq.com>
发送时间: 2020年9月22日 16:29:41
收件人: user-zh@flink.apache.org
主题: Flink-1.11.1 Kafka Table API BigInt 问题

*我的代码如下*
其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。

package com.athub.dcpoints.scala.connector.table.hive

import com.athub.dcpoints.java.model.KafkaDcpoints
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors._

/**
 * @author Nash Cen
 * @date 2020/9/22 10:33
 * @version 1.0
 */
object KafkaFlinkHiveScalaApp {

  def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hdfs")

// 1. 获取执行环境
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val settings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val tableEnv: TableEnvironment =
StreamTableEnvironment.create(env,settings)

// 2. 注册 KafkaSource 表
tableEnv.connect(new Kafka()
  .version("universal")
  .topic("ods_dcpoints_dev")
  .property("zookeeper.connect", "localhost:2181")
  .property("bootstrap.servers", "localhost:9092")
  //.property("group.id", "testGroup")
  .startFromEarliest()
)
  .withFormat(new Json())
  .withSchema(new Schema()
.field("assetSpecId", DataTypes.STRING())
.field("dcnum", DataTypes.STRING())
.field("monitorType", DataTypes.STRING())
.field("tagNo", DataTypes.STRING())
.field("updateTime", DataTypes.BIGINT())
.field("value", DataTypes.STRING())

  )
  .createTemporaryTable("kafka_source_table")

// 3. 查询转换
// 使用 sqlQuery
val selectKafkaTable: Table = tableEnv.sqlQuery(s"select
assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table")

selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka")

env.execute("KafkaFlinkHiveScalaApp")

  }

}


*运行时报错信息如下:*
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
scala.math.BigInt does not contain a setter for field bigInteger
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class
scala.math.BigInt cannot be used as a POJO type because not all fields are
valid POJO fields, and must be processed as GenericType. Please read the
Flink documentation on "Data Types & Serialization" for details of the
effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: A raw
type backed by type information has no serializable string representation.
It needs to be resolved into a proper raw type.
at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)






--
Sent from: http://apache-flink.147419.n8.nabble.com/


Row和RowData的区别

2020-09-08 Thread
Hi all,

请问`org.apache.flink.types.Row`和`org.apache.flink.table.data.RowData`的区别和联系是?


答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 Thread
Hi, godfrey


好的,如果可以的话,有了相关讨论的jira或者mail可以cc一下我吗,谢谢啦


发件人: godfrey he 
发送时间: 2020年7月22日 17:49:27
收件人: user-zh
抄送: Jark Wu; xbjt...@gmail.com; jingsongl...@gmail.com
主题: Re: 关于1.11Flink SQL 全新API设计的一些问题

Hi,首维

感谢给出非常详细的反馈。这个问题我们之前内部也有一些讨论,但由于缺乏一些真实场景,最后维持了当前的接口。
我们会根据你提供的场景进行后续讨论。

Best,
Godfrey

刘首维  于2020年7月22日周三 下午5:23写道:

> Hi, Jark
>
>
>
>感谢你的建议!
>
>我们这边充分相信社区在SQL/Table上的苦心孤诣,也愿意跟进Flink在新版本的变化。
>
>先看我遇到问题本身,你的建议确实可以帮助我解决问题。我想聊一下我对问题之外的一些想法
>
>```
>
>  >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter
> 用来做缓冲池/微批/数据过滤等功能
> 这个我觉得也可以封装在 SinkFunction 里面。
>
>   ```
>
>  
> 比如上述这个问题2,我们确实可以把它做到SinkFunction中,但是我个人认为这可能在设计上不够理想的。我个人在设计编排Function/算子的时候习惯于遵循”算子单一职责”的原则,这也是我为什么会拆分出多个process/filter算子编排到SinkFunction前面而非将这些功能耦合到SinkFunction去做。另一方面,没了DataStream,向新的API的迁移成本相对来说变得更高了一些~
> 又或者,我们现在还有一些特殊原因,算子编排的时候会去修改TaskChain Strategy,这个时候DataStream的灵活性是必不可少的
>
> 考虑到Flink Task都可以拆分成Source -> Transformation -> sink
> 三个阶段,那么能让用户可以对自己的作业针对(流或批)的运行模式下,可以有效灵活做一些自己的定制策略/优化/逻辑可能是会方便的~
>
>诚然,DataStream的灵活性确实会是一把双刃剑,但就像@leonard提到的,平台层和应用层的目的和开发重点可能也不太一样,对Flink
> API使用侧重点也不同。我个人还是希望可以在享受全新API设计优势同时,
>
> 可以继续使用DataStream(Transformation)的灵活性,助力Flink组件在我们组的开落地
>
>
> 再次感谢各位的回复!
>
> ________
> 发件人: Jark Wu 
> 发送时间: 2020年7月22日 16:33:45
> 收件人: user-zh
> 抄送: godfrey he; greemqq...@163.com; 刘首维
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
> Hi,首维,
>
> 非常感谢反馈。与 DataStream 解耦是 FLIP-95 的一个非常重要的设计目标,这让 sink/source 对于框架来说不再是黑盒,
> 因此将来才可以做诸如 state 兼容升级、消息顺序保障、自动并发设置等等事情。
>
> 关于你的一些需求,下面是我的建议和回复:
>
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> 这个理论上还属于“数据格式”的职责,所以建议做在 DeserializationSchema 上,目前 DeserializationSchema
> 支持一对多的输出。可以参考 DebeziumJsonDeserializationSchema 的实现。
>
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> 这个我觉得也可以封装在 SinkFunction 里面。
>
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> 这个社区也有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。
>
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> 这个能在具体一点吗?目前像 SupportsPartitioning 接口,就可以指定数据在交给 sink 之前先做 group by
> partition。我感觉这个可能也可以通过引入类似的接口解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 16:27, Leonard Xu  xbjt...@gmail.com>> wrote:
> Hi,首维, Ran
>
> 感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净,
> 但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
> 我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey
>
> 祝好
> Leonard Xu
>
>
> > 在 2020年7月22日,13:47,刘首维  liushou...@autohome.com.cn>> 写道:
> >
> > Hi JingSong,
> >
> >
> 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL
> SDK
> >  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
> >
> >
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> >
> >
> > 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
> >
> >
> > 
> > 发件人: Jingsong Li mailto:jingsongl...@gmail.com>>
> > 发送时间: 2020年7月22日 13:26:00
> > 收件人: user-zh
> > 抄送: imj...@gmail.com<mailto:imj...@gmail.com>
> > 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
> >
> > 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
> >
> > Best
> > Jingsong
> >
> > On Wed, Jul 22, 2020 at 12:36 PM 刘首维  liushou...@autohome.com.cn>> wrote:
> >
> >> Hi all,
> >>
> >>
> >>
> >>很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
> >>
> >>我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> >>
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
> >>
> >>
> >>
> >>所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>


答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 Thread
Hi, Jark



   感谢你的建议!

   我们这边充分相信社区在SQL/Table上的苦心孤诣,也愿意跟进Flink在新版本的变化。

   先看我遇到问题本身,你的建议确实可以帮助我解决问题。我想聊一下我对问题之外的一些想法

   ```

 >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
这个我觉得也可以封装在 SinkFunction 里面。

  ```

 
比如上述这个问题2,我们确实可以把它做到SinkFunction中,但是我个人认为这可能在设计上不够理想的。我个人在设计编排Function/算子的时候习惯于遵循”算子单一职责”的原则,这也是我为什么会拆分出多个process/filter算子编排到SinkFunction前面而非将这些功能耦合到SinkFunction去做。另一方面,没了DataStream,向新的API的迁移成本相对来说变得更高了一些~
 又或者,我们现在还有一些特殊原因,算子编排的时候会去修改TaskChain Strategy,这个时候DataStream的灵活性是必不可少的

考虑到Flink Task都可以拆分成Source -> Transformation -> sink 
三个阶段,那么能让用户可以对自己的作业针对(流或批)的运行模式下,可以有效灵活做一些自己的定制策略/优化/逻辑可能是会方便的~

   诚然,DataStream的灵活性确实会是一把双刃剑,但就像@leonard提到的,平台层和应用层的目的和开发重点可能也不太一样,对Flink 
API使用侧重点也不同。我个人还是希望可以在享受全新API设计优势同时,

可以继续使用DataStream(Transformation)的灵活性,助力Flink组件在我们组的开落地


再次感谢各位的回复!


发件人: Jark Wu 
发送时间: 2020年7月22日 16:33:45
收件人: user-zh
抄送: godfrey he; greemqq...@163.com; 刘首维
主题: Re: 关于1.11Flink SQL 全新API设计的一些问题

Hi,首维,

非常感谢反馈。与 DataStream 解耦是 FLIP-95 的一个非常重要的设计目标,这让 sink/source 对于框架来说不再是黑盒,
因此将来才可以做诸如 state 兼容升级、消息顺序保障、自动并发设置等等事情。

关于你的一些需求,下面是我的建议和回复:

>  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据 
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
这个理论上还属于“数据格式”的职责,所以建议做在 DeserializationSchema 上,目前 DeserializationSchema 
支持一对多的输出。可以参考 DebeziumJsonDeserializationSchema 的实现。

>  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
这个我觉得也可以封装在 SinkFunction 里面。

>  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
这个社区也有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。

>  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
这个能在具体一点吗?目前像 SupportsPartitioning 接口,就可以指定数据在交给 sink 之前先做 group by 
partition。我感觉这个可能也可以通过引入类似的接口解决。

Best,
Jark

On Wed, 22 Jul 2020 at 16:27, Leonard Xu 
mailto:xbjt...@gmail.com>> wrote:
Hi,首维, Ran

感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净, 
但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey

祝好
Leonard Xu


> 在 2020年7月22日,13:47,刘首维 
> mailto:liushou...@autohome.com.cn>> 写道:
>
> Hi JingSong,
>
>  简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL 
> SDK
>  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
>
>
>  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据 
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
>  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
>  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
>  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
>
>
> 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
>
>
> 
> 发件人: Jingsong Li mailto:jingsongl...@gmail.com>>
> 发送时间: 2020年7月22日 13:26:00
> 收件人: user-zh
> 抄送: imj...@gmail.com<mailto:imj...@gmail.com>
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
> 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
>
> Best
> Jingsong
>
> On Wed, Jul 22, 2020 at 12:36 PM 刘首维 
> mailto:liushou...@autohome.com.cn>> wrote:
>
>> Hi all,
>>
>>
>>
>>很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
>>
>>我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
>> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
>>
>>
>>
>>所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
>>
>
>
> --
> Best, Jingsong Lee



答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 Thread
Hi JingSong,



感谢回复,真心期待一个理想的解决方案~


发件人: Jingsong Li 
发送时间: 2020年7月22日 13:58:51
收件人: user-zh; Jark Wu
主题: Re: 关于1.11Flink SQL 全新API设计的一些问题

Hi 首维,

非常感谢你的信息,我们试图 去掉太灵活的“DataStream”,但是也逐渐发现一些额外的需求,再考虑和评估下你的需求。

CC: @Jark Wu 

Best,
Jingsong

On Wed, Jul 22, 2020 at 1:49 PM 刘首维  wrote:

> Hi JingSong,
>
>
> 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL
> SDK
>   下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
>
>
>   1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
>   2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
>   3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
>   4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
>
>
> 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
>
>
> 
> 发件人: Jingsong Li 
> 发送时间: 2020年7月22日 13:26:00
> 收件人: user-zh
> 抄送: imj...@gmail.com
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
> 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
>
> Best
> Jingsong
>
> On Wed, Jul 22, 2020 at 12:36 PM 刘首维  wrote:
>
> > Hi all,
> >
> >
> >
> > 很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
> >
> > 我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> >
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
> >
> >
> >
> > 所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
> >
>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee


答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 Thread
Hi JingSong,

  简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL 
SDK
  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子


  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据 
会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的


如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的



发件人: Jingsong Li 
发送时间: 2020年7月22日 13:26:00
收件人: user-zh
抄送: imj...@gmail.com
主题: Re: 关于1.11Flink SQL 全新API设计的一些问题

可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?

Best
Jingsong

On Wed, Jul 22, 2020 at 12:36 PM 刘首维  wrote:

> Hi all,
>
>
>
> 很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
>
> 我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
>
>
>
> 所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
>


--
Best, Jingsong Lee


关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 Thread
Hi all,



很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~

我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL 
SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。



所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)


答复: Flink catalog的几个疑问

2020-07-21 Thread
hi all, 我在想如果社区提供一个unified metastore 
server是不是会解决这个问题,然后写一个(一系列)catalog和这个metastore对应


发件人: Jark Wu 
发送时间: 2020年7月22日 11:22:56
收件人: user-zh
主题: Re: Flink catalog的几个疑问

非常欢迎贡献开源一个轻量的 catalog 实现 :)

On Wed, 22 Jul 2020 at 10:53, Jingsong Li  wrote:

> Hi,
>
> HiveCatalog就是官方唯一的可以保存所有表的持久化Catalog,包括kafka,jdbc,hbase等等connectors。
>
> > 后续有可能转正为flink 默认的catalog实现吗?
>
> 目前不太可能,你看,Flink连Hadoop的依赖都没有打进来。Hive的依赖更不会默认打进来。 依赖都没有,也不会成为默认的。
>
> > hive catalog是不支持大小写敏感的
>
> 是的,就像Godfrey说的,特别是JDBC对接的某些大小写敏感的db,这可能导致字段名对应不了。
>
> Best,
> Jingsong
>
> On Wed, Jul 22, 2020 at 10:39 AM godfrey he  wrote:
>
> > hi Xingxing,
> >
> > 1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
> > postgres catalog,
> > 可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
> > 2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
> > catalog写新的meta。
> > 是否会转为默认catalog,据我所知,目前没有。
> > 3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。
> >
> > Best,
> > Godfrey
> >
> > dixingxin...@163.com  于2020年7月21日周二 下午11:30写道:
> >
> > > Hi Flink社区:
> > > 有几个疑问希望社区小伙伴们帮忙解答一下:
> > >
> > >
> >
> 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
> > > 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗?
> > > 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。
> > >
> > >
> > >
> > >
> > > Best,
> > > Xingxing Di
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Committing offsets to Kafka takes longer than the checkpoint interval.

2020-04-28 Thread
Hi all,



今天发现有一个作业日志中连续打印下面这个报警

"Committing offsets to Kafka takes longer than the checkpoint interval. 
Skipping commit of previous offsets because newer complete checkpoint offsets 
are available. This does not compromise Flink's checkpoint integrity."


导致作业卡住无法继续消费Kafka topic


请问这个场景如何排查比较好


答复: 人为生成retract记录

2020-04-25 Thread

Hi,

我们这边做了人为生成retract记录的尝试,也是用在了binlog上,结果上还是可以的但是改造成本还是比较高的,需要自己添加对应的关系算子和优化规则。此外,这样做(有可能)会干扰执行计划的优化,期待FLIP105和95的落地!



发件人: lec ssmi 
发送时间: 2020年4月26日 10:07:48
收件人: flink-user-cn
主题: 人为生成retract记录

Hi:
假设我现在将上游retract后的结果写入到kafka,然后下游程序消费kafka去做聚合操作。
因为需要利用聚合算子能够自动处理retract的特性,所以需要将kafka的结果封装成带有不同header的row,即组装为INSERT
DELETE UPDATE类型的数据。
有什么办法可以解决吗?
如果将上下游程序合在一起 ,是没问题的,现在的难点就是拆分。
   谢谢。


答复: 关于Flink SQL UDAF中调用createAccumulator的调用问题

2020-04-21 Thread
Hi benchao,

  感谢你的回复,完美解答了我的疑惑。然后我在看代码做实验的时候又派生了两个想法/问题:

1. 将accu作为一个单独的field在GroupAggFunction中,然后在snapshotState的时候才去向state更新
2. 为accu设置一个0/初始/空/幺元或者类似概念的状态,这样我们就不必去销毁acc而是去只是将其置回


  或者说时区在设计这个部分的时候,有什么其他的考量吗

发件人: Benchao Li 
发送时间: 2020年4月21日 18:28:09
收件人: 刘首维
抄送: user-zh
主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Hi 首维,

这里说的是同一个key会创建一次,所以你看到一个AggFunction会创建多次更这个并不冲突,因为一个AggFunction属于一个subtask维度的,一个subtask处理不止一个key的。

你的第二个问题:
> 我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group 
> by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来
这个的确是存在这个问题。比如两个group by嵌套使用,第一个group by由于会更新结果,所以会retract之前的结果。第二个group 
by收到retract就会先撤销当前的结果,然后再发送撤销之后的结果;然后后面又会来一条正常的append数据,这条数据导致再次retract一次,和append一次。

> 还有在agg over group by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。
这个没有本质区别,正常情况下一个key下的agg是会一直存在的。除非是你配置了state 
retention时间,那么对应的key的state如果过了retention时间没有访问就会被清理。

刘首维 mailto:liushou...@autohome.com.cn>> 
于2020年4月21日周二 下午5:59写道:

Hi benchao,


非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group 
by的一个key应该被创建一次,可是我做实验的时候(在create 
acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。


  为了方便你帮我分析,我来补充一下环境和场景:


   版本: 1.7.2/1.9.1

  场景 : group by 嵌套, 常规聚合



  我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group 
by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来。还有在agg over group 
by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。


再次感谢你的回复

best regards


发件人: Benchao Li mailto:libenc...@gmail.com>>
发送时间: 2020年4月21日 17:45:54
收件人: user-zh
主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Hi 首维,

这是个很好的问题。

> 这个方法的调用时机是什么呢,会被调用几次呢?
这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
当然这里说的是regular groupby;
如果是window group by的话,就是每个window都会做上面的这个事情。

> 一个accumulator的生命周期是怎么样的?
如果是window group by的话,那它的生命周期就是跟window是一样的。
如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
数据是0条的时候,也会销毁。

> 一个accumulator会被反复的序列化反序列化吗?
这个问题非常好。它是否序列化跟你用的state backend有关系。
如果是rocksdb的话,是会每次更新都会存一次state。如果不是rocksdb,它就是一个内存里的数据结构,只有在
checkpoint的时候序列化。
当然这个在regular groupby的情况下,开启mini batch会减少每次都需要更新state的情况。

刘首维 mailto:liushou...@autohome.com.cn>> 
于2020年4月21日周二 下午5:37写道:

> Hi all,
>
>
>
>最近有几个疑问没能很好地理解清楚:
>
>
>
>我们都知道,UDAF中的有createAccumulator这个方法,那么:
>
> 这个方法的调用时机是什么呢,会被调用几次呢?
>
> 一个accumulator的生命周期是怎么样的?
>
> 一个accumulator会被反复的序列化反序列化吗?
>
>
>  麻烦了解相关细节的社区的同学们帮忙解答一下~
>
> 先谢谢啦
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com<mailto:libenc...@gmail.com>; 
libenc...@pku.edu.cn<mailto:libenc...@pku.edu.cn>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com<mailto:libenc...@gmail.com>; 
libenc...@pku.edu.cn<mailto:libenc...@pku.edu.cn>


答复: 关于Flink SQL UDAF中调用createAccumulator的调用问题

2020-04-21 Thread
Hi benchao,


非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group 
by的一个key应该被创建一次,可是我做实验的时候(在create 
acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。


  为了方便你帮我分析,我来补充一下环境和场景:


   版本: 1.7.2/1.9.1

  场景 : group by 嵌套, 常规聚合



  我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group 
by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来。还有在agg over group 
by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。


再次感谢你的回复

best regards


发件人: Benchao Li 
发送时间: 2020年4月21日 17:45:54
收件人: user-zh
主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Hi 首维,

这是个很好的问题。

> 这个方法的调用时机是什么呢,会被调用几次呢?
这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
当然这里说的是regular groupby;
如果是window group by的话,就是每个window都会做上面的这个事情。

> 一个accumulator的生命周期是怎么样的?
如果是window group by的话,那它的生命周期就是跟window是一样的。
如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
数据是0条的时候,也会销毁。

> 一个accumulator会被反复的序列化反序列化吗?
这个问题非常好。它是否序列化跟你用的state backend有关系。
如果是rocksdb的话,是会每次更新都会存一次state。如果不是rocksdb,它就是一个内存里的数据结构,只有在
checkpoint的时候序列化。
当然这个在regular groupby的情况下,开启mini batch会减少每次都需要更新state的情况。

刘首维  于2020年4月21日周二 下午5:37写道:

> Hi all,
>
>
>
>最近有几个疑问没能很好地理解清楚:
>
>
>
>我们都知道,UDAF中的有createAccumulator这个方法,那么:
>
> 这个方法的调用时机是什么呢,会被调用几次呢?
>
> 一个accumulator的生命周期是怎么样的?
>
> 一个accumulator会被反复的序列化反序列化吗?
>
>
>  麻烦了解相关细节的社区的同学们帮忙解答一下~
>
> 先谢谢啦
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


关于Flink SQL UDAF中调用createAccumulator的调用问题

2020-04-21 Thread
Hi all,



   最近有几个疑问没能很好地理解清楚:



   我们都知道,UDAF中的有createAccumulator这个方法,那么:

这个方法的调用时机是什么呢,会被调用几次呢?

一个accumulator的生命周期是怎么样的?

一个accumulator会被反复的序列化反序列化吗?


 麻烦了解相关细节的社区的同学们帮忙解答一下~

先谢谢啦