Re: Lateral join not finding correlate variable

2020-11-18 Thread godfrey he
Hi Dylan, Could you provide which Flink version you find out the problem with? I test the above query on master, and I get the plan, no errors occur. Here is my test case: @Test def testLateralJoin(): Unit = { util.addTableSource[(String, String, String, String, String)]("table1", 'id, 'attr1,

Re: Lateral join not finding correlate variable

2020-11-18 Thread godfrey he
ableFunction[Row] { > > > > def eval(str: String, separator: String = ";"): Unit = { > > if (str != null) { > > str.split(separator).foreach(s => collect(Row.of(s.trim( > > } > > } > > } > > > > Removing the later

Re: Lateral join not finding correlate variable

2020-11-19 Thread godfrey he
gt; """) > > streamTableEnv.createTemporaryView("view2", q2) > > > > val q3 = streamTableEnv.sqlQuery(""" > > SELECT > > w.attr1, > > p.attr3 > > FROM view1 w > > LEFT JOIN LA

Re: Table Environment for Remote Execution

2020-06-03 Thread godfrey he
Hi Satyam, for blink batch mode, only TableEnvironment can be used, and TableEnvironment do not take StreamExecutionEnvironment as argument. Instead StreamExecutionEnvironment instance is created internally. back to your requirement, you can build your table program as user jar, and submit the jo

Re: How to use Hbase Connector Sink

2020-06-11 Thread godfrey he
hi, you should make sure the types of the selected fields and the types of sink table are the same, otherwise you will get the above exception. you can change `active_ratio*25 score` to row type, just like: insert into circle_weight select rowkey, ROW(info) from ( select concat_ws('_',circleName,

Re: pyflink数据查询

2020-06-15 Thread godfrey he
hi jack,jincheng Flink 1.11 支持直接将select的结果collect到本地,例如: CloseableIterator it = tEnv.executeSql("select ...").collect(); while(it.hasNext()) { it.next() } 但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng) 但是1.11的TableResult#collect实现对流的query支持不完整(只支持append only的query),master已经完整支持。 可以参照 j

Re: Blink Planner Retracting Streams

2020-06-16 Thread godfrey he
hi John, You can use Tuple2[Boolean, Row] to replace CRow, the StreamTableEnvironment#toRetractStream method return DataStream[(Boolean, T)]. the code looks like: tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] { override def map(value: (Boolean, Row)): R = ...

Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-08 Thread godfrey he
Congratulations! Thanks Zhijiang and Piotr for the great work, and thanks everyone for their contribution! Best, Godfrey Benchao Li 于2020年7月8日周三 下午12:39写道: > Congratulations! Thanks Zhijiang & Piotr for the great work as release > managers. > > Rui Li 于2020年7月8日周三 上午11:38写道: > >> Congratulat

Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread godfrey he
hi Flavio, Only old planner supports BatchTableEnvironment (which can convert to/from DataSet), while Blink planner in batch mode only support TableEnvironment. Because Blink planner convert the batch queries to Transformation (corresponding to DataStream), instead of DataSet. one approach is you

Re: Table API jobs migration to Flink 1.11

2020-07-12 Thread godfrey he
to test the migration..then >> I could try yo implement the new Table Source interface >> >> On Fri, Jul 10, 2020 at 3:38 PM godfrey he wrote: >> >>> hi Flavio, >>> Only old planner supports BatchTableEnvironment (which can convert >>> t

Re: Parquet format in Flink 1.11

2020-07-15 Thread godfrey he
hi Flavio, Parquet format supports configuration from ParquetOutputFormat . please refer to [1] for details [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/

Re: Flink 1.11 Sql client environment yaml

2020-07-18 Thread godfrey he
hi GenericInMemoryCatalog does not support settings now, or you can refer to [1] for supported catalog details and you can refer to [2] to supported types details. "Kafka schema registry for schema" is under discussion [3], which can be ready in 1.12. sql client supports DDL to create a table wi

Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
hi Kelly, As the exception message mentioned: currently, we must cast the time attribute to regular TIMESTAMP type, then we can do regular join. Because time attribute will be out-of-order after regular join, and then we can't do window aggregate based on the time attribute. We can improve it tha

Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
JIRA: https://issues.apache.org/jira/browse/FLINK-18651 godfrey he 于2020年7月21日周二 上午9:46写道: > hi Kelly, > As the exception message mentioned: currently, we must cast the time > attribute to regular TIMESTAMP type, > then we can do regular join. Because time attribute will be

Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread godfrey he
hi Jun, Currently, sql client has supported -u option, just like: ./bin/sql-client.sh embedded -u "insert_statement". There is already a JIRA [1] that wants to support -f option [1] https://issues.apache.org/jira/browse/FLINK-12828 Best, Godfrey Jun Zhang 于2020年7月29日周三 上午9:22写道: > I want to

Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread godfrey he
this is not suitable for > executing sql files > > godfrey he 于2020年7月29日周三 上午9:56写道: > >> hi Jun, >> >> Currently, sql client has supported -u option, just like: >> ./bin/sql-client.sh embedded -u "insert_statement". >> >> There is alread

Re: Unexpected unnamed sink in SQL job

2020-08-04 Thread godfrey he
I think we assign a meaningful name to sink Transformation like other Transformations in StreamExecLegacySink/BatchExecLegacySink. Paul Lam 于2020年8月4日周二 下午5:25写道: > Hi Jingsong, > > Thanks for your input. Now I understand the design. > > I think in my case the StreamingFileCommitter is not chai

Re: Submit Flink 1.11 job from java

2020-08-06 Thread godfrey he
hi Flavio, Maybe you can try env.executeAsync method, which just submits the job and returns a JobClient. Best, Godfrey Flavio Pompermaier 于2020年8月6日周四 下午9:45写道: > Hi to all, > in my current job server I submit jobs to the cluster setting up an SSH > session with the JobManager host and running

Re: GroupBy with count on a joint table only let met write using toRetractStream

2020-08-11 Thread godfrey he
Hi Faye, 1) In your sql, different events are for different groups, it seems hard to extract a global Filter into DataStream. 2) AFAK, you can just drop the retract message (the flag is false), and then convert the retract stream to append stream. The downstream job needs to duplicate the records

Re: Format for timestamp type in Flink SQL

2020-08-18 Thread godfrey he
Hi Youngwoo, > 1. TIMESTAMP WITH LOCAL TIME ZONE Currently, SQL client uses legacy types for the collect sink, that means `TIMESTAMP WITH LOCAL TIME ZONE` is not supported. you can refer to [1] to find the supported types, and there is a pr [2] to fix this. >2. TIMESTAMP(3) WITH LOCAL TIME ZONE I

Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-17 Thread godfrey he
Hi Dan, What kind of joins [1] you are using? Currently, only temporal join and join with table function do not reshuffle the input data in Table API and SQL, other joins always reshuffle the input data based on join keys. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table

Re: Null result cannot be used for atomic types

2020-01-09 Thread godfrey he
hi sunfulin, which flink version are you using ? best, godfrey sunfulin 于2020年1月10日周五 下午1:50写道: > Hi, I am running a Flink app while reading Kafka records with JSON format. > And the connect code is like the following: > > > tableEnv.connect( > > new Kafka() > > .version(ka

Re: some basic questions

2020-01-18 Thread godfrey he
hi kant, > 1) The Documentation says full outer join is supported however the below code just exits with value 1. No error message. if you have converted Table to DataStream, please execute it with StreamExecutionEnvironment ( call env.execute("simple job") ) > 2) If I am using a blink planner sh

Re: some basic questions

2020-01-18 Thread godfrey he
gt;> job") but I still get the same error message. >> >> Kant >> >> On Sat, Jan 18, 2020 at 6:26 PM godfrey he wrote: >> >>> hi kant, >>> >>> > 1) The Documentation says full outer join is supported however the >>> be

Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread godfrey he
Congrats to everyone involved! Thanks, Yu & Gary. Best, godfrey Yu Li 于2020年2月13日周四 下午12:57写道: > Hi Kristoff, > > Thanks for the question. > > About Java 11 support, please allow me to quote from our release note [1]: > > Lastly, note that the connectors for Cassandra, Hive, HBase, and Kafka >

Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-20 Thread godfrey he
Congrats Jingsong! Well deserved. Best, godfrey Jeff Zhang 于2020年2月21日周五 上午11:49写道: > Congratulations!Jingsong. You deserve it > > wenlong.lwl 于2020年2月21日周五 上午11:43写道: > >> Congrats Jingsong! >> >> On Fri, 21 Feb 2020 at 11:41, Dian Fu wrote: >> >> > Congrats Jingsong! >> > >> > > 在 2020年2月21

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

2020-02-25 Thread godfrey he
hi, I find that JsonRowDeserializationSchema only supports date-time with timezone according to RFC 3339. So you need add timezone to time data (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope it can help you. Bests, godfrey Outlook 于2020年2月25日周二 下午5:49写道: > By the way, my fl

Re: Does Flink 1.9 support create or replace views syntax in raw sql?

2020-02-26 Thread godfrey he
Hi Kant, if you want the store the catalog data in Local Filesystem/HDFS, you can implement a user defined catalog (just need to implement Catalog interface) Bests, Godfrey kant kodali 于2020年2月26日周三 下午12:28写道: > Hi Jingsong, > > Can I store it in Local Filesystem/HDFS? > > Thanks! > > On Mon, J

Re: Does Flink 1.9 support create or replace views syntax in raw sql?

2020-02-27 Thread godfrey he
w again and safely read the data from >> where it left off? >> >> Thanks! >> >> On Wed, Feb 26, 2020 at 6:47 AM godfrey he wrote: >> >>> Hi Kant, if you want the store the catalog data in Local >>> Filesystem/HDFS, you can implement a user defined

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread godfrey he
hi kant, > Also why do I need to convert to DataStream to print the rows of a table? Why not have a print method in the Table itself? Flink 1.10 introduces a utility class named TableUtils to convert a Table to List, this utility class is mainly used for demonstration or testing and is only applic

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread godfrey he
lementation > "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}" > } > > // make compileOnly dependencies available for tests: > sourceSets { > main.compileClasspath += configurations.flinkShadowJar > main.runtimeClasspath += configurations.flinkS

Re: Multiple SQL Optimization

2020-04-10 Thread godfrey he
Hi forideal, Currently, Blink planner with TableEnvironment supports multiple sinks optimization which will try best to reuse common sub-graph. Best, Godfrey forideal 于2020年4月10日周五 下午4:31写道: > Hello > >There are 3 SQLs all querying the same table, but the generated GAG is > 3 independent t

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-13 Thread godfrey he
Hi Jiahui, Query hint is a way for fine-grained configuration. just out of curiosity, is it a strong requirement that users need to config different IDLE_STATE_RETENTION_TIME for each operator? Best, Godfrey Jiahui Jiang 于2020年4月14日周二 上午2:07写道: > Also for some more context, we are building a

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-14 Thread godfrey he
27; is deprecated, relying on the > fact that TableConfig is read during toDataStream feels like relying on an > implementation details that just happens to work, and there is no guarantee > that it will keep working in the future versions... > > Thanks! > --

Re: Registering UDAF in blink batch app

2020-04-14 Thread godfrey he
Hi Dmytro, Currently, TableEnvironment does not support register AggregationFunction and TableFunction, because type extractor has not been unified for Java and Scala. One approach is we can use "TableEnvironment#createFunction" which will register UDF to catalog. I find "createTemporarySystemFun

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-14 Thread godfrey he
)? Or at > least add some Java docs so that I won’t worry about the behavior under the > hook suddenly change? > 2. What do we think about supporting query configuration using Hints to be > a first class supported Flink feature? > > Thank you so much 😊 > --

Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
Hi Flavio, We prose FLIP-91[1] to support SQL Gateway at the beginning of this year. After a long discussion, we reached an agreement that SQL Gateway is an eco-system under ververia as first step.[2] Which could help SQL Gateway move forward faster. Now we almost finish first version development,

Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
Hi Flavio, that's great~ Best, Godfrey Flavio Pompermaier 于2020年4月16日周四 下午5:01写道: > Great, I'm very interested in trying it out! > Maybe we can also help with the development because we need something like > that. > Thanks a lot for the pointers > > On Thu, Apr 16,

Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
, catalogs in our use case are not know at configuration time.. > is there a way to permit to register a JDBC catalog (for example when I > want to connect to a Postgres database)? > What if I want to add SHOW TRIGGERS? Do you think it could be interesting? > > On Thu, Apr 16, 2

Re: instance number of user defined function

2020-04-16 Thread godfrey he
Hi, An UDTF will be wrapped into an operator, an operator instance will be executed by a slot (or parallelism/thread) , About operator, task, slot, you can refer to [1] for more details. A TM (a JVM process) may has multiple slots, that means a JVM process may has multiple UDTF instances. It's bet

Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
to Flink as much as possible (since then in the batch jobs we >> use Flink and we don't like to do the same work twice..). >> In this way we can contribute to Flink if something is missing in the SQL >> Gateway. However I don't know how to extend the existing stuff (for exa

Re: Schema with TypeInformation or DataType

2020-04-16 Thread godfrey he
Hi tison, >1. Will TypeInformation be deprecated and we use DataType as type system everywhere? AFAIK, runtime will still supports TypeInformation, while table module supports DataType > 2. Schema in Table API currently support only TypeInformation to register a field, shall we support the DataTy

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread godfrey he
Hi Gyual, Can you convert the regular join to lookup join (temporal join) [1], and then you can use window aggregate. > I understand that the problem is that we cannot join with the Hive table and still maintain the watermark/even time column. But why is this? Regular join can't maintain the tim

[ANNOUNCE] Apache Flink 1.11.4 released

2021-08-10 Thread godfrey he
The Apache Flink community is very happy to announce the release of Apache Flink 1.11.4, which is the fourth bugfix release for the Apache Flink 1.11 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streamin

Re: Missing image tag in apache/flink repository ?

2022-11-15 Thread godfrey he
Thanks for reporting this, I will resolve it ASAP. Best, Godfrey Alon Halimi via user 于2022年11月15日周二 16:46写道: > > Hello :) > > > > It seems the tag “apache/flink:1.16.0-scala_2.12” is missing – I get the > following error: > > > > failed to pull and unpack image "docker.io/apache/flink:1.16.0-s