Re:Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-19 Thread 王炳焱
Hi 非常感谢您的回复,state-processor-api我也尝试过,SQL生成的job graph没办法获取到每个算子的UID,所以state-processor-api也无法获取原来的state信息,没办法操作state,如果有更好的解决方案麻烦再回复一下邮件哈 感谢 在 2021-05-20 10:46:22,"Yun Tang" 写道: >Hi > >BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 >state-processor-API

Re: Guidance for Integration Tests with External Technologies

2021-05-19 Thread Yun Gao
Hi Rion, Do you mean you are running the tests directly in the IDE like Idea for "multiple tests run in sequence" ? If the test could be successful when running separately, but would fail when running in sequence, then it seems there other tests should still infect on the failed tests. For

Re: Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Yun Gao
Hi Marco, I think Flink does not need 500GB for the source, the source should be able to read from S3 in a streaming pattern (namely open the file, create an input stream and fetch data as required). But it might indeed need disk spaces for intermediate data between operators and the sort

Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-19 Thread Yun Tang
Hi BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer 的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data stream

退订

2021-05-19 Thread zander0...@163.com
退订 周德虎 电话:15021351770 邮箱:zander0...@163.com

Re: flink 1.13.0 ,使用flink sql 链接数据库是否支持多模式,即表名为schema.name

2021-05-19 Thread Shengkai Fang
请问是要用正则表达式匹配数据库中的table吗?‘org.users’ 是一个正则表达式吗? Best, Shengkai Asahi Lee <978466...@qq.com> 于2021年5月19日周三 下午2:01写道: > hi! >flink jdbc 是否有考虑支持表基于模式查询?如下 table-name写法: > CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status > BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH

Re: two questions about flink stream processing: kafka sources and TimerService

2021-05-19 Thread Jin Yi
thanks ingo! i'll look at moving to rolling my own operator and using ConnectedStreams.transform with it. On Tue, May 18, 2021 at 3:18 AM Ingo Bürk wrote: > Hi Jin, > > 1) As far as I know the order is only guaranteed for events from the same > partition. If you want events across partitions

Re: Prometheus Reporter Enhancement

2021-05-19 Thread Mason Chen
Are there any plans to rework some of the metric name formulations (getMetricIdentifier or getLogicalScope)? Currently, the label keys and/or label values are concatenated in the metric name and the information is redundant and makes the metric names longer. Would it make sense to remove the

Parallelism in Production: Best Practices

2021-05-19 Thread Yaroslav Tkachenko
Hi everyone, I'd love to learn more about how different companies approach specifying Flink parallelism. I'm specifically interested in real, production workloads. I can see a few common patterns: - Rely on default parallelism, scale by changing parallelism for the whole pipeline. I guess it

Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Marco Villalobos
> On May 19, 2021, at 7:26 AM, Yun Gao wrote: > > Hi Marco, > > For the remaining issues, > > 1. For the aggregation, the 500GB of files are not required to be fit into > memory. > Rough speaking for the keyed().window().reduce(), the input records would be > first > sort according to

Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Yun Gao
Hi Marco, For the remaining issues, 1. For the aggregation, the 500GB of files are not required to be fit into memory. Rough speaking for the keyed().window().reduce(), the input records would be first sort according to the key (time_series.name) via external sorts, which only consumes a fix

[no subject]

2021-05-19 Thread Wenyi Xu

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Thanks Dian. It worked for me Regards, Zerah On Wed, May 19, 2021, 5:14 PM Dian Fu wrote: > Hi Zerah, > > You could try to replace > ``` > value_schema = avro.schema.parse() > ``` > > with the following code: > ``` > JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser >

[Statefun] Truncated Messages in Python workers

2021-05-19 Thread Jan Brusch
Hi, recently we started seeing the following faulty behaviour in the Flink Stateful Functions HTTP communication towards external Python workers. This is only occuring when the system is under heavy load. The Java Application will send HTTP Messages to an external Python Function but the

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Dian Fu
Hi Zerah, You could try to replace ``` value_schema = avro.schema.parse() ``` with the following code: ``` JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser value_schema = JSchemaParser().parse(value_schema_str) ``` The reason is that ```value_schema = avro.schema.parse() ```

??????????????queryable-state.proxy.ports????????

2021-05-19 Thread cao.j
??taskManager??queryable-state.proxy.ports??0??proxy

Re:

2021-05-19 Thread Jake
Hi, vtygoss You can check out the official demo[1] ``` import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} val settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build() val tEnv = TableEnvironment.create(setting) ```

[no subject]

2021-05-19 Thread vtygoss
Hi, I have below use case Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn, but yarn application is still running when insert job finished, and yarn container is not released. I try to use BatchTableEnvironment, but “Primary key and unique key are not

Re:Re:Re:flink sql写mysql中文乱码问题

2021-05-19 Thread Michael Ran
数据库的字段字符编码 在 2021-05-18 18:19:31,"casel.chen" 写道: >我的URL连接串已经使用了 useUnicode=truecharacterEncoding=UTF-8 结果还是会有乱码 > > > > > > > > > > > > > > > > > >在 2021-05-18 17:21:12,"王炳焱" <15307491...@163.com> 写道:

Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Sumeet Malhotra
Thanks Xingbo! The workaround will probably work for now, at least it avoids having to refer to index values in the rest of the function. Cheers, Sumeet On Wed, May 19, 2021 at 3:02 PM Xingbo Huang wrote: > Hi Sumeet, > > Due to the limitation of the original PyFlink serializers design, there

Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Xingbo Huang
Hi Sumeet, Due to the limitation of the original PyFlink serializers design, there is no way to pass attribute names to Row in row-based operations. In release-1.14, I am reconstructing the implementations of serializers[1]. After completion, accessing attribute names of `Row` in row-based

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Hi Dian, Type of value_schema is <*class 'avro.schema.RecordSchema*'> I have only a Json schema string and schema registry url. Please find below snippet : import avro.schema value_schema_str = """ { "namespace": "com.nextgen.customer", "type": "record", "name":

Re: 请问flink 什么时候支持读写ACID的hive表

2021-05-19 Thread Rui Li
你好, Flink暂时没有计划支持hive的ACID表。目前hive connector的代码无法保证ACID语义,所以即使你去掉“Reading or writing ACID table %s is not supported”这个检查也达不到预期的效果。 是否考虑将ACID表迁移到数据湖中呢,比如iceberg有相应的迁移工具[1]。 [1] https://iceberg.apache.org/spark-procedures/#table-migration On Wed, May 19, 2021 at 1:16 PM youngysh wrote: > hi > >

Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Sumeet Malhotra
Hi, According to the documentation for PyFlink Table row based operations [1], typical usage is as follows: @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) def split(x: Row) -> Row: for s in x[1].split(","): yield x[0], s table.flat_map(split) Is there any way that row

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Dian Fu
Hi Zerah, What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true? Regards, Dian > 2021年5月19日 下午3:41,Zerah J 写道: > > Hi Dian, > > Thanks for your suggestion. > > I tried to invoke

Re: Root Exception can not be shown on Web UI in Flink 1.13.0

2021-05-19 Thread Matthias Pohl
Hi Gary, Not sure whether you've seen my question in the Jira issue: May you be able to share the overall JobManager/TaskManager logs with us? That would help us understand the context a bit more on why no TaskManagerLocation was set. Let's move any further correspondence into FLINK-22688 [1]

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Hi Dian, Thanks for your suggestion. I tried to invoke ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below : class MyAvroRowDeserializationSchema(DeserializationSchema): def __init__(self, record_class: str =

Re: DataStream Batch Execution Mode and large files.

2021-05-19 Thread Marco Villalobos
Thank you very much. You've been very helpful. Since my intermediate results are large, I suspect that io.tmp.dirs must literally be on the local file system. Thus, since I use EMR, I'll need to configure EBS to support more data. On Tue, May 18, 2021 at 11:08 PM Yun Gao wrote: > Hi Marco, > >

flink all events getting dropped as late

2021-05-19 Thread Debraj Manna
Crossposting from stackoverflow My flink pipeline looks like below WatermarkStrategy watermarkStrategy = WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(900))

Re: DataStream Batch Execution Mode and large files.

2021-05-19 Thread Yun Gao
Hi Marco, With BATCH mode, all the ALL_TO_ALL edges would be marked as blocking and would use intermediate file to transfer data. Flink now support hash shuffle and sort shuffle for blocking edges[1], both of them stores the intermediate files in the directories configured by io.tmp.dirs[2].

Re: DataStream API Batch Execution Mode restarting...

2021-05-19 Thread Marco Villalobos
Thank you. I used the default restart strategy. I'll change that. On Tue, May 18, 2021 at 11:02 PM Yun Gao wrote: > Hi Marco, > > Have you configured the restart strategy ? if the restart-strategy [1] is > configuration > into some strategies other than none, Flink should be able to restart

Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Marco Villalobos
Questions Flink DataStream in BATCH execution mode scalability advice. Here is the problem that I am trying to solve. Input is an S3 bucket directory with about 500 GB of data across many files. The instance that I am running on only has 50GB of EBS storage. The nature of this data is time

Re: DataStream API Batch Execution Mode restarting...

2021-05-19 Thread Yun Gao
Hi Marco, Have you configured the restart strategy ? if the restart-strategy [1] is configuration into some strategies other than none, Flink should be able to restart the job automatically on failover. The restart strategy could also be configuration via

flink 1.13.0 ??????flink sql ??????????????????????????????????schema.name

2021-05-19 Thread Asahi Lee
hi! flink jdbc ?? table-name?? CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' =