Using RocksDBStateBackend and SSD to store states, application runs slower..

2022-07-21 Thread vtygoss
Hi, community! I am doing some performance tests based on my scene. 1. Environment - Flink: 1.13.5 - StateBackend: RocksDB, incremental - user case: complex sql contains 7 joins and 2 aggregation, input data 30,000,000 records and output 60,000,000 records about 80GB. - resource: flink on

Re: Reply:DelegationTokenManager

2022-06-28 Thread vtygoss
working on this topic. [1] He is in the best position to answer your questions as far as I know. :-) [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework On Tue, Jun 21, 2022 at 8:38 AM vtygoss wrote: Hi, flink community! I don't know much

Reply:DelegationTokenManager

2022-06-21 Thread vtygoss
Hi, flink community! I don't know much details for KDC. Can different TaskManagers hold different tokens? If so, driver and each worker can renew their tokens in their respective DelegationTokenManager individually. Thanks for your any replies. Best Regards! 在 2022年6月21日 13:30,vtygoss

DelegationTokenManager

2022-06-20 Thread vtygoss
Hi, flink community! I am trying to do some work on renewing DelegationToken periodically for DelegtionTokenManager, and met some problems. 1. Implementations of DelegationTokenProvider There seems to be only two implementations for testing defined by SPI service:

accuracy validation of streaming pipeline

2022-05-20 Thread vtygoss
Hi community! I'm working on migrating from full-data-pipeline(with spark) to incremental-data-pipeline(with flink cdc), and i met a problem about accuracy validation between pipeline based flink and spark. For bounded data, it's simple to validate the two result sets are consitent or not.

how to convert Table to DataStream / DataSet by TableEnvironment on Batch mode?

2022-03-16 Thread vtygoss
Hi, community! When dealing with retractable stream, i meet a problem about converting Table to DataSet / DataStream on batch mode in Flink-1.13.5. scenario and process: - 1. Database CDC to Kafka - 2. Sync data into Hive with HoodieTableFormat(Apache Hudi) - 3. Incremental processing

Pyflink1.13 or JavaFlink1.13 + Jpython + Python2.7, which way has better performance?

2022-03-03 Thread vtygoss
Hi, community! I am working on data processing structure optimization from full data pipeline to incremental data pipeline, from PySpark with PythonCode to two optional ways below: 1. PyFlink 1.13 + Python 2.7 2. JavaFlink 1.13 + JPython + Python 2.7 As far as i know, the python APIs

Re: using flink retract stream and rockdb, too many intermediateresult of values cause checkpoint too heavy to finish

2021-12-16 Thread vtygoss
ster/docs/dev/table/tuning/#local-global-aggregation vtygoss 于2021年12月13日周一 17:13写道: Hi, community! I meet a problem in the procedure of building a streaming production pipeline using Flink retract stream and hudi-hdfs/kafka as storage engine and rocksdb as statebackend. In my scenario, -

using flink retract stream and rockdb, too many intermediate result of values cause checkpoint too heavy to finish

2021-12-13 Thread vtygoss
Hi, community! I meet a problem in the procedure of building a streaming production pipeline using Flink retract stream and hudi-hdfs/kafka as storage engine and rocksdb as statebackend. In my scenario, - During a patient's hospitalization, multiple measurements of vital signs are

Which issue or commit should i merge in flink-1.13.3 for buffer debloating?

2021-12-07 Thread vtygoss
Hi community! Because of the limitation of connector, i couldn't upgrade apache flink from version 1.13.3 to versin 1.14.0. But i really need the important feature of buffer debloating in 1.14.0 for heavy checkpoint at backpressure. So which issue or commit should i merge in flink-1.13.3

Re: how to run streaming process after batch process is completed?

2021-12-01 Thread vtygoss
out the user ml in the reply on the first try)... 在 2021年11月30日 21:42,Alexander Preuß 写道: Hi Vtygoss, Can you explain a bit more about your ideal pipeline? Is the batch data bounded data or could you also process it in streaming execution mode? And is the streaming data derived from the batch

how to run streaming process after batch process is completed?

2021-11-30 Thread vtygoss
Hi, community! By Flink, I want to unify batch process and streaming process in data production pipeline. Batch process is used to process inventory data, then streaming process is used to process incremental data. But I meet a problem, there is no state in batch and the result is error if i

Re: How to express the datatype of sparksql collect_list(named_struct(...))in flinksql?

2021-11-09 Thread vtygoss
e annotation for dynamic RowData structure? Thanks for your suggestions again. Best Regards! ``` def eval(@DataTypeHint("MULTISET" + ">") data: JMAP[Row, Integer]): String = { if (data == null || data.size() == 0) { return "" } data.keySet().toArray().mkString(&qu

How to express the datatype of sparksql collect_list(named_struct(...)) in flinksql?

2021-11-08 Thread vtygoss
Hi, flink community! I am working on migrating data production pipeline from SparkSQL to FlinkSQL(1.12.0). And i meet a problem about MULTISET>. ``` Spark SQL select COLLECT_LIST(named_struct('id', id, 'name', name)) as info from table group by ...; ``` - 1. how to express and store

Re: how to delete all rows one by one in batch execution mode;shutdown cluster after all tasks finished

2021-10-22 Thread vtygoss
ecords you'll still have to generate the new ones, so why not generate them directly into a new place? For problem 2, yarn-cluster is the mode for a yarn session cluster, which means the cluster will remain even after the job is finished. If you want to finish the Flink job as well as the y

how to delete all rows one by one in batch execution mode; shutdown cluster after all tasks finished

2021-10-21 Thread vtygoss
Hi, community! I am working on building data processing pipeline based on changelog(CDC) and i met two problems. --(sql_0)--> Table A --(sql_1)---> Table B --->other tables downstream --(sql_2)--->Table C---> other tables downstream Table A is generated based on

modify the classloader of JM dynamically to handle "ADD JAR hdfs://" statement

2021-10-16 Thread vtygoss
Hi, community! I am working on building a stream processing platform using Flink 1.12.0. I met a problem in the scenario of SQL Application migration from SparkSQL/HiveSQL to FlinkSQL. How to dynamically modify the classloader of the JobManager already launched to handle "ADD JAR

throughput reduced when mini-batch is enabled; how to concat multiset using separator

2021-09-18 Thread vtygoss
Hi, Flink community! i have two problems, 1. how to concat multiset using separator? In spark sql: concat_ws(seperator, collect_set(column)). But in flink, the result data type of function 'collect(distinct column) ' is multiset, the corresponding class of multiset is

Required built-in function [plus] could not be found in any catalog.

2021-09-07 Thread vtygoss
Hi, Flink Community! i met a problem using flink 1.12.0 standalone cluster with hive catalog. scene 1: - module: hive module - execute sql: select sum(1) from xxx - exception: org.apache.flink.table.api.TableException: Required built-in function [plus] could not be found in any catalog.

how to emit a deletion event for all data in iterating of production logic

2021-08-10 Thread vtygoss
Hi, Flink community! I have a problem when iterating the data production logic. e.g. data production procedure: …-> Table A(change log stream) -> Table B(change log stream) ->Table C(change log stream) …. production logic of Table B: insert into table B select * from Table A where

Flink + Kafka Dynamic Table

2021-07-05 Thread vtygoss
Hi, flink community! I have below scenario in medical field - record is frequently modified and must not be lost - when record is modified the results previously produced by this record should also be modified. e.g. table A, B, C. A join B and result is table D, A join C and result is

question about concating an array in flink sql

2021-06-10 Thread vtygoss
Hi, I have below use case I want concat an array using comma separator, but got exception “Cannot apply 'CONCAT_WS' to arguments of type 'CONCAT_WS(, )'. Supported form(s): 'CONCAT_WS()’”。 How to concat an array in flink sql? please help to offer some advices. Regards ``` [test

[no subject]

2021-05-19 Thread vtygoss
Hi, I have below use case Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn, but yarn application is still running when insert job finished, and yarn container is not released. I try to use BatchTableEnvironment, but “Primary key and unique key are not

Qustion about Flink Upsert Dynamic Kafka Table unlimited expansion

2021-05-06 Thread vtygoss
Hi Community, Recently i am working on building realtime data warehouse at medical field. Using Flink and Upsert-Kafka Dynamic Table, but the historical data must not be expired and the changelog stream in kafka is unlimited expanding, i have met a problem with unlimited expanding data scale.

how to convert DataStream to Table

2021-04-11 Thread vtygoss
Hi All, there is a scenario where I need to process OGG Log data in kafka using Flink Sql. I can convert the OGG Log Stream to DataStream and each event has RowKind, but i have trouble converting DataStream to a Table. For test, i tried StreamTableEnvironment#fromDataStream and