Re: Missing image tag in apache/flink repository ?

2022-11-15 Thread godfrey he
Thanks for reporting this, I will resolve it ASAP.

Best,
Godfrey

Alon Halimi via user  于2022年11月15日周二 16:46写道:
>
> Hello :)
>
>
>
> It seems the tag “apache/flink:1.16.0-scala_2.12” is missing – I get the 
> following error:
>
>
>
> failed to pull and unpack image "docker.io/apache/flink:1.16.0-scala_2.12"
>
>
>
>
>
> note that:
>
> (1) /apache/flink:1.16.0-scala_2.12 (without the 0 version suffix ) does exist
>
> (2) /flink:1.16.0-scala_2.12 (without apache prefix )does exist
>
>
>
> Thanks in advance
>
>
>
> Alon Halimi
>
> This message is intended only for the designated recipient(s). It may contain 
> confidential or proprietary information. If you are not the designated 
> recipient, you may not review, copy or distribute this message. If you have 
> mistakenly received this message, please notify the sender by a reply e-mail 
> and delete this message. Thank you.
>
> This message has been scanned for malware by Forcepoint. www.forcepoint.com


[ANNOUNCE] Apache Flink 1.11.4 released

2021-08-10 Thread godfrey he
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.4, which is the fourth bugfix release for the Apache Flink 1.11
series.



Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.



The release is available for download at:

https://flink.apache.org/downloads.html



Please check out the release blog post for an overview of the improvements
for this bugfix release:

https://flink.apache.org/news/2021/08/09/release-1.11.4.html



The full release notes are available in Jira:

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349404



We would like to thank all contributors of the Apache Flink community who
made this release possible!



Regards,

Godfrey


Re: Lateral join not finding correlate variable

2020-11-19 Thread godfrey he
Hi Dylan,

I have reproduced your issue based on your code,
Currently Flink does not support such nested correlate pattern query.
I have created a issue to track this [1].
Thanks for your reporting and help.

[1] https://issues.apache.org/jira/browse/FLINK-20255

Best,
Godfrey

Dylan Forciea  于2020年11月19日周四 下午12:10写道:

> Godfrey,
>
>
>
> I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack
> trace running exactly this code:
>
>
>
> import org.apache.flink.api.scala._
>
> import org.apache.flink.core.fs.FileSystem.WriteMode
>
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>
> import org.apache.flink.table.api._
>
> import org.apache.flink.table.api.bridge.scala._
>
> import org.apache.flink.types.Row
>
> import org.apache.flink.table.annotation.FunctionHint
>
> import org.apache.flink.table.annotation.DataTypeHint
>
> import org.apache.flink.table.functions.TableFunction
>
>
>
>
>
> @FunctionHint(output = new DataTypeHint("ROW"))
>
> class SplitStringToRows extends TableFunction[Row] {
>
>   def eval(str: String, separator: String = ";"): Unit = {
>
> if (str != null) {
>
>   str.split(separator).foreach(s => collect(Row.of(s.trim(
>
> }
>
>   }
>
> }
>
> object Job {
>
>
>
>   def main(args: Array[String]): Unit = {
>
> val settings = EnvironmentSettings
> .newInstance().useBlinkPlanner().inStreamingMode().build()
>
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> val streamTableEnv = StreamTableEnvironment.create(streamEnv,
> settings)
>
>
>
> streamTableEnv.createTemporarySystemFunction(
>
>   "SplitStringToRows",
>
>   classOf[SplitStringToRows]
>
> ) // Class defined in previous email
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table1 (
>
> id_source BIGINT PRIMARY KEY,
>
> attr1_source STRING,
>
> attr2 STRING
>
>   ) WITH (
>
>'connector' = 'jdbc',
>
>'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
>
>'table-name' = '',
>
>'username' = '',
>
>'password' = '',
>
>'scan.fetch-size' = '500',
>
>'scan.auto-commit' = 'false')
>
> """)
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table2 (
>
> attr1_source STRING,
>
> attr2 STRING,
>
> attr3 DECIMAL,
>
> attr4 DATE
>
>   ) WITH (
>
>'connector' = 'jdbc',
>
>'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
>
>'table-name' = '',
>
>'username' = '',
>
>'password' = '',
>
>'scan.fetch-size' = '500',
>
>'scan.auto-commit' = 'false')
>
> """)
>
>
>
> val q1 = streamTableEnv.sqlQuery("""
>
>   SELECT
>
> id_source AS id,
>
> attr1_source AS attr1,
>
> attr2
>
>   FROM table1
>
> """)
>
> streamTableEnv.createTemporaryView("view1", q1)
>
>
>
> val q2 = streamTableEnv.sqlQuery(
>
>   """
>
> SELECT
>
>   a.attr1 AS attr1,
>
>   attr2,
>
>   attr3,
>
>   attr4
>
> FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source,
> ';')) AS a(attr1)
>
> """)
>
> streamTableEnv.createTemporaryView("view2", q2)
>
>
>
> val q3 = streamTableEnv.sqlQuery("""
>
> SELECT
>
>   w.attr1,
>
>   p.attr3
>
> FROM view1 w
>
> LEFT JOIN LATERAL (
>
>   SELECT
>
> attr1,
>
> attr3
>
>   FROM (
>
> SELECT
>
>   attr1,
>
>   attr3,
>
>   ROW_NUMBER() OVER (
>
> PARTITION BY attr1
>
> ORDER BY
>
>   attr4 DESC NULLS LAST,
>
>   w.attr2 = attr2 DESC NULLS LAST
>
>   ) AS row_num
>
>   FR

Re: Lateral join not finding correlate variable

2020-11-18 Thread godfrey he
Dylan,

Thanks for you feedback, if the planner encounters
"unexpected correlate variable $cor2 in the plan" exception,
There's a high probability that FlinkDecorrelateProgram has some bugs
or the query pattern is not supported now. I try to use JDBC Connector as
the input tables,
but I still don't reproduce the exception. Could you provide your full
code, including ddl, query, etc.
Thanks so much.

Best,
Godfrey



Dylan Forciea  于2020年11月18日周三 下午10:09写道:

> Godfrey,
>
>
>
> I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and
> am still having the same issue. Note that I am using the JDBC Connector for
> the input tables, and table1 and table2 are actually created from queries
> on those connector tables and not directly.
>
>
>
> Since you indicated what I did should work, I played around a bit more,
> and determined it’s something inside of the table2 query that is triggering
> the error. The id field there is generated by a table function. Removing
> that piece made the plan start working. Table 2 is formulated as follows:
>
>
>
> SELECT
>
>   T.id,
>
>   attr2,
>   attr3,
>
>   attr4
>
> FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)
>
>
>
> Where SplitStringToRows is defined as:
>
>
>
> @FunctionHint(output = new DataTypeHint("ROW"))
>
> class SplitStringToRows extends TableFunction[Row] {
>
>
>
>   def eval(str: String, separator: String = ";"): Unit = {
>
> if (str != null) {
>
>   str.split(separator).foreach(s => collect(Row.of(s.trim(
>
> }
>
>   }
>
> }
>
>
>
> Removing the lateral table bit in that first table made the original query
> plan work correctly.
>
>
>
> I greatly appreciate your assistance!
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
> *From: *godfrey he 
> *Date: *Wednesday, November 18, 2020 at 7:33 AM
> *To: *Dylan Forciea 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Lateral join not finding correlate variable
>
>
>
> Hi Dylan,
>
>
>
> Could you provide which Flink version you find out the problem with?
>
> I test the above query on master, and I get the plan, no errors occur.
>
> Here is my test case:
>
> @Test
> def testLateralJoin(): Unit = {
>   *util*.addTableSource[(String, String, String, String, String)]("table1", 
> 'id, 'attr1, 'attr2, 'attr3, 'attr4)
>   *util*.addTableSource[(String, String, String, String, String)]("table2", 
> 'id, 'attr1, 'attr2, 'attr3, 'attr4)
>   val query =
> """
>   |SELECT
>   |  t1.id,
>   |  t1.attr1,
>   |  t2.attr2
>   |FROM table1 t1
>   |LEFT JOIN LATERAL (
>   |  SELECT
>   |id,
>   |attr2
>   |  FROM (
>   |SELECT
>   |  id,
>   |  attr2,
>   |  ROW_NUMBER() OVER (
>   |PARTITION BY id
>   |ORDER BY
>   |  attr3 DESC,
>   |  t1.attr4 = attr4 DESC
>   |  ) AS row_num
>   |FROM table2)
>   |WHERE row_num = 1) t2
>   |ON t1.id = t2.id
>   |""".stripMargin
>   *util*.verifyPlan(query)
> }
>
> Best,
>
> Godfrey
>
>
>
> Dylan Forciea  于2020年11月18日周三 上午7:44写道:
>
> This may be due to not understanding  lateral joins in Flink – perhaps you
> can only do so on temporal variables – but I figured I’d ask since the
> error message isn’t intuitive.
>
>
>
> I am trying to do a combination of a lateral join and a top N query. Part
> of my ordering is based upon whether the a value in the left side of the
> query matches up. I’m trying to do this in the general form of:
>
>
>
> SELECT
>
>   t1.id,
>
>   t1.attr1,
>
>   t2.attr2
>
> FROM table1 t1
>
> LEFT JOIN LATERAL (
>
>   SELECT
>
> id,
>
> attr2
>
>   FROM (
>
> SELECT
>
>   id,
>
>   attr2,
>
>   ROW_NUMBER() OVER (
>
> PARTITION BY id
> ORDER BY
>
>   attr3 DESC,
>
>   t1.attr4 = attr4 DESC
>
>   ) AS row_num
>
> FROM table2
>
> WHERE row_num = 1) t2
>
> ON (t1.id = t2.id)
>
>
>
> I am getting an error that looks like:
>
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> unexpected correlate variable $cor2 in the plan
>
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCo

Re: Lateral join not finding correlate variable

2020-11-18 Thread godfrey he
Hi Dylan,

Could you provide which Flink version you find out the problem with?
I test the above query on master, and I get the plan, no errors occur.
Here is my test case:

@Test
def testLateralJoin(): Unit = {
  util.addTableSource[(String, String, String, String,
String)]("table1", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
  util.addTableSource[(String, String, String, String,
String)]("table2", 'id, 'attr1, 'attr2, 'attr3, 'attr4)
  val query =
"""
  |SELECT
  |  t1.id,
  |  t1.attr1,
  |  t2.attr2
  |FROM table1 t1
  |LEFT JOIN LATERAL (
  |  SELECT
  |id,
  |attr2
  |  FROM (
  |SELECT
  |  id,
  |  attr2,
  |  ROW_NUMBER() OVER (
  |PARTITION BY id
  |ORDER BY
  |  attr3 DESC,
  |  t1.attr4 = attr4 DESC
  |  ) AS row_num
  |FROM table2)
  |WHERE row_num = 1) t2
  |ON t1.id = t2.id
  |""".stripMargin
  util.verifyPlan(query)
}

Best,
Godfrey

Dylan Forciea  于2020年11月18日周三 上午7:44写道:

> This may be due to not understanding  lateral joins in Flink – perhaps you
> can only do so on temporal variables – but I figured I’d ask since the
> error message isn’t intuitive.
>
>
>
> I am trying to do a combination of a lateral join and a top N query. Part
> of my ordering is based upon whether the a value in the left side of the
> query matches up. I’m trying to do this in the general form of:
>
>
>
> SELECT
>
>   t1.id,
>
>   t1.attr1,
>
>   t2.attr2
>
> FROM table1 t1
>
> LEFT JOIN LATERAL (
>
>   SELECT
>
> id,
>
> attr2
>
>   FROM (
>
> SELECT
>
>   id,
>
>   attr2,
>
>   ROW_NUMBER() OVER (
>
> PARTITION BY id
> ORDER BY
>
>   attr3 DESC,
>
>   t1.attr4 = attr4 DESC
>
>   ) AS row_num
>
> FROM table2
>
> WHERE row_num = 1) t2
>
> ON (t1.id = t2.id)
>
>
>
> I am getting an error that looks like:
>
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> unexpected correlate variable $cor2 in the plan
>
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
>
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>
>  at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>
>  at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>
>  at scala.collection.Iterator.foreach(Iterator.scala:943)
>
>  at scala.collection.Iterator.foreach$(Iterator.scala:943)
>
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>
>  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>
>  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
>  at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>
>  at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>
>  at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>
>  at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>
>  at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>
>  at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
>  at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
>
>  at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>
>  at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
>
>  at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
>
>  at
> org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
>
>  at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
>
>  at io.oseberg.flink.well.ok.Job.main(Job.scala)
>
>
>
> The only other thing I can think of doing is creating a Table Aggregate
> function to pull this off. But, I wanted to check to make sure I wasn’t
> doing something wrong in the above first, or if there is something I’m not
> thinking of doing.
>
>
>
> Regards,
>
> Dylan Forciea
>


Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-17 Thread godfrey he
Hi Dan,

What kind of joins [1] you are using? Currently, only temporal join and
join with table function
do not reshuffle the input data in Table API and SQL, other joins always
reshuffle the input data
based on join keys.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins

Best,
Godfrey


Dan Hill  于2020年9月17日周四 上午3:44写道:

> Hi Dawid!
>
> I see.  Yea, this would break my job after I move away from the prototype.
>
> How do other Flink devs avoid unnecessary reshuffles when sourcing data
> from Kafka?  Is the Table API early or not used often?
>
>
>
>
> On Wed, Sep 16, 2020 at 12:31 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Dan,
>>
>> I am afraid there is no mechanism to do that purely in the Table API yet.
>> Or I am not aware of one. If the reinterpretAsKeyedStream works for you,
>> you could use this approach and convert a DataStream (with the
>> reinterpretAsKeyedStream applied) to a Table[1] and then continue with the
>> Table API.
>>
>> On the topic of reinterpretAsKeyedStream, I wanted to stress out one
>> thing. I'd like to bring your attention to this warning:
>>
>> *WARNING*: The re-interpreted data stream *MUST* already be
>> pre-partitioned in *EXACTLY* the same way Flink’s keyBy would partition
>> the data in a shuffle w.r.t. key-group assignment.
>>
>> I think it is not trivial(or even not possible?) to achieve unless both
>> the producer and the consumer are Flink jobs with the same parallelism.
>>
>> Best,
>>
>> Dawid
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table
>> On 16/09/2020 18:22, Dan Hill wrote:
>>
>> Hi Piotr!  Yes, that's what I'm using with DataStream.  It works well in
>> my prototype.
>>
>> On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Have you seen "Reinterpreting a pre-partitioned data stream as keyed
>>> stream" feature? [1] However I'm not sure if and how can it be integrated
>>> with the Table API. Maybe someone more familiar with the Table API can help
>>> with that?
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>>
>>> śr., 16 wrz 2020 o 05:35 Dan Hill  napisał(a):
>>>
 How do I avoid unnecessary reshuffles when using Kafka as input?  My
 keys in Kafka are ~userId.  The first few stages do joins that are usually
 (userId, someOtherKeyId).  It makes sense for these joins to stay on the
 same machine and avoid unnecessary shuffling.

 What's the best way to avoid unnecessary shuffling when using Table SQL
 interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify the
 keys for Kafka.







Re: Format for timestamp type in Flink SQL

2020-08-18 Thread godfrey he
Hi Youngwoo,

> 1. TIMESTAMP WITH LOCAL TIME ZONE
Currently, SQL client uses legacy types for the collect sink, that
means `TIMESTAMP
WITH LOCAL TIME ZONE` is not supported.
you can refer to [1] to find the supported types, and there is a pr [2] to
fix this.

>2. TIMESTAMP(3) WITH LOCAL TIME ZONE
I do not reproduce the exception

> 3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE
sql parser does not support them yet.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html
[2] https://github.com/apache/flink/pull/12872

Best,
Godfrey

Youngwoo Kim (김영우)  于2020年8月16日周日 上午1:23写道:

> Hi Benchao,
>
> I include ['json.timestamp-format.standard' = 'ISO-8601'] to table's DDL
> but it does not work with slightly different errors:
>
> 1. TIMESTAMP WITH LOCAL TIME ZONE
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
>
> Caused by: org.apache.flink.table.api.TableException: Unsupported
> conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE' (conversion
> class: java.time.Instant) to type information. Only data types that
> originated from type information fully support a reverse conversion.
>
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)
>
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
>
> at
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
>
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)
>
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)
>
> at
> org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
>
> at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)
>
> at
> org.apache.flink.table.client.gateway.local.result.CollectStreamResult.(CollectStreamResult.java:71)
>
> at
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:101)
>
> at
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:129)
>
> at
> org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)
>
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)
>
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)
>
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)
>
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)
>
> at java.util.Optional.ifPresent(Optional.java:159)
>
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
>
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
>
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
>
>
> 2. TIMESTAMP(3) WITH LOCAL TIME ZONE
>
>
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.UnsupportedOperationException: Unsupported type: TIMESTAMP(3)
> WITH LOCAL TIME ZONE
>
>
> 3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE
>
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at
> line 3, column 32.
> Was expecting:
> "LOCAL" ...
>
>
>
> It looks like the timestamp '-MM-ddTHH:mm:ss.SSSZ' is not supported in
> both 'SQL' and 'ISO-8601' format standard.
>
> Just curious, what is the default format for timestamp type with a time
> zone?
>
>
> Thanks,
>
> Youngwoo
>
>
> On Sat, Aug 15, 2020 at 8:16 PM Benchao Li  wrote:
>
>> Hi Youngwoo,
>>
>> What version of Flink and Json Format are you using?
>> From 1.11, we introduced  `json.timestamp-format.standard` to declare the
>> timestamp format.
>> You can try `timestamp with local zone` data type with `ISO-8601`
>> timestamp format.
>>
>> Youngwoo Kim (김영우)  于2020年8月15日周六 下午12:12写道:
>>
>>> Hi,
>>>
>>> I'm trying to create a table using Flink SQL to query from a Kafka
>>> topic. Messages from Kafka look like following:
>>>
>>> (snip)
>>> "svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
>>> "log_

Re: GroupBy with count on a joint table only let met write using toRetractStream

2020-08-11 Thread godfrey he
Hi Faye,

1) In your sql, different events are for different groups, it seems hard to
extract a global Filter into DataStream.
2) AFAK, you can just drop the retract message (the flag is false), and
then convert the retract stream to append stream.
 The downstream job needs to duplicate the records, just like [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication

Best,
Godfrey

Faye Pressly  于2020年8月8日周六 上午3:30写道:

> Sorry just notice I made a typo in the last table (clickAdvertId != null
> instead of clickCount !=null)
>
> Table allImpressionTable = impressionsTable
>   .leftOuterJoin(clicksTable, "clickImpId = impImpressionId &&
> clickMinute = impMinute")
>   .groupBy("impAdvertId, impVariationName, impMinute")
>   .select("impAdvertId, impVariationName, clickAdvertId.count as
> clickCount, impMinute")
>.where("clickAdvertId != null");
>
> --
> *From:* Faye Pressly
> *Sent:* Friday, August 7, 2020 9:28 PM
> *To:* user@flink.apache.org 
> *Subject:* GroupBy with count on a joint table only let met write using
> toRetractStream
>
> Hello,
>
> I have a steam of events (coming from a Kinesis Stream) of this form:
>
> impressionId | advertid | variationName | eventType | eventTime
>
> The end goal is to output back on a Kinesis Stream the count of event of
> type 'impression' and the count of events of type 'click'
>
> however, I need to drop (or ignore) event of type clicks that don't have a
> matching impressionId with an event of type 'impression' (So basically I
> need to discard click event that don't have an impression)
>
> This is how tackled my solution:
>
> // Convert the stream into a table
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId,
> advertId, variationName, eventType, eventTime.rowtime");
> tEnv.registerTable("Events", eventsTable);
>
> // Create a table with only event of type clicks
> Table clicksTable = eventsTable
>   .where("eventType = 'click'")
>
> .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
>   .groupBy("impressionId, advertId, variationName, minuteWindow")
>   .select("impressionId as clickImpId, creativeId as clickAdvertId,
> variationName as clickVariationName, minuteWindow.rowTime as clickMinute");
>
> // Create a table with only event of type impression
> Table impressionsTable = eventsTable
>   .where("eventType = 'impression'")
>
> .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
>   .groupBy("impressionId, advertId, variationName, minuteWindow")
>   .select("impressionId as impImpressionId, advertId as impAdvertId,
> variationName as impVariationName, eventTime, minuteWindow.rowTime as
> impMinute");
>
> // left join the impression with the clicks using the impressionId as well
> as the temporal field
> //and then group by to generate a count of all the click that have a
> matching impression (aka row where clickAdvertId is not null)
> Table allImpressionTable = impressionsTable
>   .leftOuterJoin(clicksTable, "clickImpId = impImpressionId &&
> clickMinute = impMinute")
>   .groupBy("impAdvertId, impVariationName, impMinute")
>   .select("impAdvertId, impVariationName, clickAdvertId.count as
> clickCount, impMinute")
>.where("clickCount != null");
> [ same logic to count impressions]
>
> Now to debug and to see if the counts are correct I usually use "
> tEnv.toAppendStream(allImpressionTable, Result.class).print()" and I'm
> able to use that new created stream to send it back on a kinesis Stream
>
> However I have an error saying that I cannot use toAppendStream and that
> instead I have to use toRetractStream. It indeed works and I can see the
> counts in the output are correct however I don't understand how I can use
> the result contained in this new stream because it has multiple rows with
> "true"/"false" and the correct count is usuall the last entry with the
> "true" key.
>
> I have multiple question:
>
> 1) I'm very new with Flink and I would like to know if my approach to
> filter-out un-matching events is the correct one ? (stream -> table and
> joins -> stream)
> Is there a much easier way of doing this ? Is it perhaps possible to
> filter all these events directly in the DataStream?
>
>
> 2) How do I use the retractStream? How do use it in order to send the
> final counts to a sink and not the entirety of the "true/False"
> insert/Delete rows?
>
>
> Thank you!
>


Re: Submit Flink 1.11 job from java

2020-08-06 Thread godfrey he
hi Flavio,
Maybe you can try env.executeAsync method,
which just submits the job and returns a JobClient.

Best,
Godfrey

Flavio Pompermaier  于2020年8月6日周四 下午9:45写道:

> Hi to all,
> in my current job server I submit jobs to the cluster setting up an SSH
> session with the JobManager host and running the bin/flink run command
> remotely (since the jar is put in the flink-web-upload directory).
> Unfortunately, this approach makes very difficult to caputre all exceptions
> that a job submission could arise
> Is there a better way to invoke the execution of a main class contained in
> a jar file uploaded on the Job Manager? Ideally I could invoke the Flink
> REST API but the problem is that I need to call some code after
> env.execute() and that's not possible if I use them..every java code after
> env.execute() is discarded, while this does not happen if I use the CLI
> client.
>
> I know that there was some client refactoring in Flink 1.11 but I didn't
> find a solution to this problem yet.
>
> Thanks in advance for any help,
> Flavio
>


Re: Unexpected unnamed sink in SQL job

2020-08-04 Thread godfrey he
I think we assign a meaningful name to sink Transformation
 like other Transformations in StreamExecLegacySink/BatchExecLegacySink.

Paul Lam  于2020年8月4日周二 下午5:25写道:

> Hi Jingsong,
>
> Thanks for your input. Now I understand the design.
>
> I think in my case the StreamingFileCommitter is not chained because its
> upstream operator is not parallelism 1.
>
> BTW, it’d be better if it has a more meaningful operator name.
>
> Best,
> Paul Lam
>
> 2020年8月4日 17:11,Jingsong Li  写道:
>
> StreamingFileCommitter
>
>
>


Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread godfrey he
Yes, The pr still needs to be improved.
In most cases, there are more than one statement in the sql file,
so -f option should support multiple statements.
however, a related PR [1] has not completed yet.

[1] https://github.com/apache/flink/pull/8738

Best,
Godfrey

Jun Zhang  于2020年7月29日周三 上午10:17写道:

> hi,godfrey:
> Thanks for your reply
>
> 1. I have seen the -u parameter, but my sql file may not only include
> 'insert into select ', but also SET, DDL, etc.
>
> 2. I may not have noticed this issue. I took a look at this issue. I think
> this issue may have some problems. For example, he finally called the
> CliClient.callCommand method.
> But I think that many options in callCommand are not completely suitable
> for sql files, such as HELP, CLEAR, SELECT, etc. The select operation opens
> a window to display the results, obviously this is not suitable for
> executing sql files
>
> godfrey he  于2020年7月29日周三 上午9:56写道:
>
>> hi Jun,
>>
>> Currently, sql client has supported -u option, just like:
>>  ./bin/sql-client.sh embedded -u "insert_statement".
>>
>> There is already a JIRA [1] that wants to support -f option
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12828
>>
>> Best,
>> Godfrey
>>
>> Jun Zhang  于2020年7月29日周三 上午9:22写道:
>>
>>> I want to execute some flink sql batch jobs regularly, such as 'insert
>>> into
>>> select .', but I can't find a suitable method so far, so reference
>>>  hive, I changed the source code and add a  '--filename'  parameter  so
>>> that we can execute a sql file.
>>>
>>> like this:
>>>
>>> /home/flink/bin/sql-client.sh embedded -f flink.sql
>>>
>>> what about any ideas or plans for this feature community?
>>>
>>


Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 Thread godfrey he
hi Jun,

Currently, sql client has supported -u option, just like:
 ./bin/sql-client.sh embedded -u "insert_statement".

There is already a JIRA [1] that wants to support -f option

[1] https://issues.apache.org/jira/browse/FLINK-12828

Best,
Godfrey

Jun Zhang  于2020年7月29日周三 上午9:22写道:

> I want to execute some flink sql batch jobs regularly, such as 'insert into
> select .', but I can't find a suitable method so far, so reference
>  hive, I changed the source code and add a  '--filename'  parameter  so
> that we can execute a sql file.
>
> like this:
>
> /home/flink/bin/sql-client.sh embedded -f flink.sql
>
> what about any ideas or plans for this feature community?
>


Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
JIRA: https://issues.apache.org/jira/browse/FLINK-18651

godfrey he  于2020年7月21日周二 上午9:46写道:

> hi  Kelly,
> As the exception message mentioned: currently, we must cast the time
> attribute to regular TIMESTAMP type,
> then we can do regular join. Because time attribute will be out-of-order
> after regular join,
> and then we can't do window aggregate based on the time attribute.
>
> We can improve it that the planner implicitly casts the time attribute to
> regular TIMESTAMP type,
> and throws exception there is an operator (after join) depended on time
> attribute, like window aggregate.
>
> I will create a JIRA to trace this.
>
> Best,
> Godfrey
>
> Kelly Smith  于2020年7月21日周二 上午6:38写道:
>
>> Hi folks,
>>
>>
>>
>> I have a question Flink SQL. What I want to do is this:
>>
>>
>>
>>- Join a simple lookup table (a few rows) to a stream of data to
>>enrich the stream by adding a column from the lookup table.
>>
>>
>>
>>
>>
>> For example, a simple lookup table:
>>
>>
>>
>> *CREATE TABLE *LookupTable (
>> *`computeClass`  *STRING,
>> *`multiplier`*
>> *FLOAT *) *WITH *(
>> *'connector' *= *'filesystem'*,
>> *'path' *= *'fpu-multipliers.csv'*,
>> *'format' *=
>> *'csv' *)
>>
>>
>>
>>
>>
>> And I’ve got a Kafka connector table with rowtime semantics that has a
>> `computeClass` field. I simply want to join (in a streaming fashion) the
>> `multiplier` field above.
>>
>>
>>
>>
>> *SELECT*`timestamp`,
>>
>> // ...
>> ks.computeClass,
>> lt.`multiplier`
>> *FROM *KafkaStream ks
>>
>> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>>
>>
>>
>> Doing a simple join like that gives me this error:
>>
>>
>>
>> “org.apache.flink.table.api.TableException: Rowtime attributes must not
>> be in the input rows of a regular join. As a workaround you can cast the
>> time attributes of input tables to TIMESTAMP before.”
>>
>>
>>
>> Which leads me to believe that I should use an Interval Join instead, but
>> that doesn’t seem to be appropriate since my table is static and has no
>> concept of time. Basically, I want to hold the entire lookup table in
>> memory, and simply enrich the Kafka stream (which need not be held in
>> memory).
>>
>>
>>
>> Any ideas on how to accomplish what I’m trying to do?
>>
>>
>>
>> Thanks!
>>
>> Kelly
>>
>


Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
hi  Kelly,
As the exception message mentioned: currently, we must cast the time
attribute to regular TIMESTAMP type,
then we can do regular join. Because time attribute will be out-of-order
after regular join,
and then we can't do window aggregate based on the time attribute.

We can improve it that the planner implicitly casts the time attribute to
regular TIMESTAMP type,
and throws exception there is an operator (after join) depended on time
attribute, like window aggregate.

I will create a JIRA to trace this.

Best,
Godfrey

Kelly Smith  于2020年7月21日周二 上午6:38写道:

> Hi folks,
>
>
>
> I have a question Flink SQL. What I want to do is this:
>
>
>
>- Join a simple lookup table (a few rows) to a stream of data to
>enrich the stream by adding a column from the lookup table.
>
>
>
>
>
> For example, a simple lookup table:
>
>
>
> *CREATE TABLE *LookupTable (
> *`computeClass`  *STRING,
> *`multiplier`*
> *FLOAT *) *WITH *(
> *'connector' *= *'filesystem'*,
> *'path' *= *'fpu-multipliers.csv'*,
> *'format' *=
> *'csv' *)
>
>
>
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a
> `computeClass` field. I simply want to join (in a streaming fashion) the
> `multiplier` field above.
>
>
>
>
> *SELECT*`timestamp`,
>
> // ...
> ks.computeClass,
> lt.`multiplier`
> *FROM *KafkaStream ks
>
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>
>
>
> Doing a simple join like that gives me this error:
>
>
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.”
>
>
>
> Which leads me to believe that I should use an Interval Join instead, but
> that doesn’t seem to be appropriate since my table is static and has no
> concept of time. Basically, I want to hold the entire lookup table in
> memory, and simply enrich the Kafka stream (which need not be held in
> memory).
>
>
>
> Any ideas on how to accomplish what I’m trying to do?
>
>
>
> Thanks!
>
> Kelly
>


Re: Flink 1.11 Sql client environment yaml

2020-07-18 Thread godfrey he
hi

GenericInMemoryCatalog does not support settings now,
or you can refer to [1] for supported catalog details
and you can refer to [2] to supported types details.

"Kafka schema registry for schema" is under discussion [3],
which can be ready in 1.12.

sql client supports DDL to create a table with json format [4],
you can use ROW type to define nested json.
for example:

create table my_table (
  f varchar,
  nest_column row<
a varchar,
b int,
c int
  >
) with (
...
)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html#catalogs
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html
[3] https://issues.apache.org/jira/browse/FLINK-16048
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#how-to-create-a-table-with-json-format

Best,
Godfrey


Lian Jiang  于2020年7月18日周六 上午6:28写道:

> Hi,
>
> I am experimenting Flink SQL by following
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html.
> I want to set up an environment yaml to query Kafka data (json in avro
> format). Where can I find the information below?
>
> 1. use GenericInMemoryCatalog (e.g. type, settings)
> 2. use Kafka schema registry for schema. The example hard code the schema
> in env yaml.
> 3. other than UDF, is there a way to easily query a deeply nested json in
> Flink SQL?
>
> Appreciate your help!
>
> Regards
> Lian
>
>
>
>
>


Re: Parquet format in Flink 1.11

2020-07-15 Thread godfrey he
hi Flavio,

Parquet format supports configuration from ParquetOutputFormat
.
please
refer to [1] for details

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/parquet.html#format-options

Best,
Godfrey



Flavio Pompermaier  于2020年7月15日周三 下午8:44写道:

> Hi to all,
> in my current code I use the legacy Hadoop Output format to write my
> Parquet files.
> I wanted to use the new Parquet format of Flink 1.11 but I can't find how
> to migrate the following properties:
>
> ParquetOutputFormat.setBlockSize(job, parquetBlockSize);
> ParquetOutputFormat.setEnableDictionary(job, true);
> ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
>
> Is there a way to set those configs?
> And if not, is there a way to handle them without modifying the source of
> the flink connector (i.e. extending some class)?
>
> Best,
> Flavio
>


Re: Table API jobs migration to Flink 1.11

2020-07-12 Thread godfrey he
hi Flavio,

`BatchTableSource` can only be used for old planner.
if you want to use Blink planner to run batch job,
your table source should implement `StreamTableSource`
and `isBounded` method return true.

Best,
Godfrey



Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:

> Is it correct to do something like this?
>
> TableSource myTableSource = new BatchTableSource() {
>   @Override
>   public TableSchema getTableSchema() {
> return new TableSchema(dsFields, ft);
>   }
>   @Override
>   public DataSet getDataSet(ExecutionEnvironment execEnv) {
> return execEnv.createInput(myInputformat);
>   }
> };
>
> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier 
> wrote:
>
>> How can you reuse InputFormat to write a TableSource? I think that at
>> least initially this could be the simplest way to test the migration..then
>> I could try yo implement the new Table Source interface
>>
>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he  wrote:
>>
>>> hi Flavio,
>>> Only old planner supports BatchTableEnvironment (which can convert
>>> to/from DataSet),
>>> while Blink planner in batch mode only support TableEnvironment. Because
>>> Blink planner
>>> convert the batch queries to Transformation (corresponding to
>>> DataStream), instead of DataSet.
>>>
>>> one approach is you can migrate them to TableSource instead (InputFormat
>>> can be reused),
>>> but TableSource will be deprecated later. you can try new table source[1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>
>>> Best,
>>> Godfrey
>>>
>>> Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:
>>>
>>>> Thanks but I still can't understand how to migrate my legacy code. The
>>>> main problem is that I can't create a BatchTableEnv anymore so I can't
>>>> call createInput.
>>>>
>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>> TableSource instead?
>>>>
>>>> public static void main(String[] args) throws Exception {
>>>> ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>> BatchTableEnvironment btEnv =
>>>> TableEnvironment.getTableEnvironment(env);
>>>> MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>> ft).finish();
>>>> DataSet rows = env.createInput(myInputformat);
>>>> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
>>>> 1, WriteMode.OVERWRITE);
>>>> btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>> btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>> env.execute();
>>>>   }
>>>>
>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>> dwysakow...@apache.org> wrote:
>>>>
>>>>> You should be good with using the TableEnvironment. The
>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>> DataStream. We do not support converting batch Table programs to
>>>>> DataStream yet.
>>>>>
>>>>> A following code should work:
>>>>>
>>>>> EnvironmentSettings settings =
>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>
>>>>> TableEnvironment.create(settings);
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>> > Hi to all,
>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>> >
>>>>> > EnvironmentSettings settings =
>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>> >
>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>> >
>>>>> > if (!settings.isStreamingMode()) {
>>>>> > throw new TableException(
>>>>> > "StreamTableEnvironment can not run in batch mode for now, please use
>>>>> > TableEnvironment.");
>>>>> > }
>>>>> >
>>>>> > What should I do here?
>>>>> >
>>>>> > Thanks in advance,
>>>>> > Flavio
>>>>>
>>>>>
>>>>


Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread godfrey he
hi Flavio,
Only old planner supports BatchTableEnvironment (which can convert to/from
DataSet),
while Blink planner in batch mode only support TableEnvironment. Because
Blink planner
convert the batch queries to Transformation (corresponding to DataStream),
instead of DataSet.

one approach is you can migrate them to TableSource instead (InputFormat
can be reused),
but TableSource will be deprecated later. you can try new table source[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

Best,
Godfrey

Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:

> Thanks but I still can't understand how to migrate my legacy code. The
> main problem is that I can't create a BatchTableEnv anymore so I can't
> call createInput.
>
> Is there a way to reuse InputFormats? Should I migrate them to TableSource
> instead?
>
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment btEnv =
> TableEnvironment.getTableEnvironment(env);
> MyInputFormat myInputformat =  new MyInputFormat(dsFields,
> ft).finish();
> DataSet rows = env.createInput(myInputformat);
> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1,
> WriteMode.OVERWRITE);
> btEnv.registerTableSink("out", dsFields, ft, outSink);
> btEnv.insertInto(table, "out", btEnv.queryConfig());
> env.execute();
>   }
>
> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz 
> wrote:
>
>> You should be good with using the TableEnvironment. The
>> StreamTableEnvironment is needed only if you want to convert to
>> DataStream. We do not support converting batch Table programs to
>> DataStream yet.
>>
>> A following code should work:
>>
>> EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().inBatchMode().build();
>>
>> TableEnvironment.create(settings);
>>
>> Best,
>>
>> Dawid
>>
>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>> > Hi to all,
>> > I was trying to update my legacy code to Flink 1.11. Before I was
>> > using a BatchTableEnv and now I've tried to use the following:
>> >
>> > EnvironmentSettings settings =
>> > EnvironmentSettings.newInstance().inBatchMode().build();
>> >
>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>> >
>> > if (!settings.isStreamingMode()) {
>> > throw new TableException(
>> > "StreamTableEnvironment can not run in batch mode for now, please use
>> > TableEnvironment.");
>> > }
>> >
>> > What should I do here?
>> >
>> > Thanks in advance,
>> > Flavio
>>
>>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>


Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-08 Thread godfrey he
Congratulations!

Thanks Zhijiang and Piotr for the great work, and thanks everyone for their
contribution!

Best,
Godfrey

Benchao Li  于2020年7月8日周三 下午12:39写道:

> Congratulations!  Thanks Zhijiang & Piotr for the great work as release
> managers.
>
> Rui Li  于2020年7月8日周三 上午11:38写道:
>
>> Congratulations! Thanks Zhijiang & Piotr for the hard work.
>>
>> On Tue, Jul 7, 2020 at 10:06 PM Zhijiang 
>> wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.11.0, which is the latest major release.
>>>
>>> Apache Flink® is an open-source stream processing framework for distributed,
>>> high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this new major release:
>>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346364
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Cheers,
>>> Piotr & Zhijiang
>>>
>>
>>
>> --
>> Best regards!
>> Rui Li
>>
>
>
> --
>
> Best,
> Benchao Li
>


Re: Blink Planner Retracting Streams

2020-06-16 Thread godfrey he
hi John,

You can use Tuple2[Boolean, Row] to replace CRow, the
StreamTableEnvironment#toRetractStream method return DataStream[(Boolean,
T)].

the code looks like:

tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
  override def map(value: (Boolean, Row)): R = ...
})

Bests,
Godfrey

John Mathews  于2020年6月17日周三 下午12:13写道:

> Hello,
>
> I am working on migrating from the flink table-planner to the new blink
> one, and one problem I am running into is that it doesn't seem like Blink
> has a concept of a CRow, unlike the original table-planner.
>
> I am therefore struggling to figure out how to properly convert a
> retracting stream to a SingleOutputStreamOperator when using just the Blink
> planner libraries.
>
> E.g. in the old planner I could do something like this:
> SingleOutputStreamOperator stream =
> tableEnvironment.toRetractStream(table, typeInfo)
> .map(value -> new CRow(value.f1, value.f0);
>
> but without the CRow, I'm not sure how to accomplish this.
>
> Any suggestions?
>
> Thanks!
> John
>
>
>


Re: pyflink数据查询

2020-06-15 Thread godfrey he
hi jack,jincheng

Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
   it.next() 
}

但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)

但是1.11的TableResult#collect实现对流的query支持不完整(只支持append
only的query),master已经完整支持。

可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。

Best,
Godfrey



jincheng sun  于2020年6月15日周一 下午4:14写道:

> 你好 Jack,
>
> >  pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
> 我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
>
> 我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
> 1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
> 2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
> 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
>
> 如果上面回复 没有解决你的问题,欢迎随时反馈~~
>
> Best,
> Jincheng
>
>
>
> Jeff Zhang  于2020年6月9日周二 下午5:39写道:
>
>> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
>> https://www.bilibili.com/video/BV1Te411W73b?p=20
>> 可以加入钉钉群讨论:30022475
>>
>>
>>
>> jack  于2020年6月9日周二 下午5:28写道:
>>
>>> 问题请教:
>>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>>
>>> flink能否实现这样的方式?
>>> 感谢
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: How to use Hbase Connector Sink

2020-06-11 Thread godfrey he
hi,

you should make sure the types of the selected fields and the types of sink
table are the same,
otherwise you will get the above exception. you can change `active_ratio*25
score` to row type, just like:

insert into circle_weight select rowkey, ROW(info) from (
select concat_ws('_',circleName,dt) rowkey, active_ratio*25 score as info
from tb) t;


Best,
Godfrey

op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道:

> hi
> flink1.10,wen i want to sink data to hbase table like this:
>
>  bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
>rowkey String,
>info ROW
>  ) WITH (
>'connector.type' = 'hbase',
>'connector.version' = '1.4.3',
>'connector.table-name' = 'ms:test_circle_info',
>'connector.zookeeper.quorum' = 'localhost:2181',
>'connector.zookeeper.znode.parent' =
> '/hbase-secure',
>'connector.write.buffer-flush.max-size' =
> '10mb',
>'connector.write.buffer-flush.max-rows' =
> '1000',
>'connector.write.buffer-flush.interval' = '2s'
>  )""")
>
> bstEnv.sqlUpdate(
>   """
> |insert into circle_weight
> |select
> |concat_ws('_',circleName,dt) rowkey,
> |active_ratio*25 score
> |from tb""")
>
> but i get following exceptions,can anybody tell me what is wrong?
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink
> default_catalog.default_database.circle_weight do not match.
> Query schema: [rowkey: STRING, score: DOUBLE]
> Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)
>


Re: Table Environment for Remote Execution

2020-06-03 Thread godfrey he
Hi Satyam,

for blink batch mode, only TableEnvironment can be used,
and TableEnvironment do not take StreamExecutionEnvironment as argument.
Instead StreamExecutionEnvironment instance is created internally.

back to your requirement, you can build your table program as user jar,
and submit the job through flink cli [1] to remote environment.

Bests,
Godfrey

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html



Satyam Shekhar  于2020年6月3日周三 下午2:59写道:

> Hello,
>
> I am running into a very basic problem while working with Table API. I
> wish to create a TableEnvironment connected to a remote environment that
> uses Blink planner in batch mode. Examples and documentation I have come
> across so far recommend the following pattern to create such an environment
> -
>
> var settings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inBatchMode()
>   .build();
> var tEnv = TableEnvironment.create(settings);
>
> The above configuration, however, does not connect to a remote
> environment. Tracing code in TableEnvironment.java, I see the following
> method in BlinkExecutorFactory.java that appears to relevant -
>
> Executor create(Map, StreamExecutionEnvironment);
>
> However, it seems to be only accessible through the Scala bridge. I can't
> seem to find a way to instantiate a TableEnvironment that takes
> StreamExecutionEnvironment as an argument. How do I achieve that?
>
> Regards,
> Satyam
>


Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread godfrey he
Hi Gyual,

Can you convert the regular join to lookup join (temporal join) [1],
and then you can use window aggregate.

>  I understand that the problem is that we cannot join with the Hive table
and still maintain the watermark/even time column. But why is this?
Regular join can't maintain the time attribute as increasing trend (one
record may be joined with a very old record),
that means the watermark does not also been guaranteed to increase.

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Godfrey

Gyula Fóra  于2020年4月20日周一 下午4:46写道:

> Hi All!
>
> We hit a the following problem with SQL and trying to understand if there
> is a valid workaround.
>
> We have 2 tables:
>
> *Kafka*
> timestamp (ROWTIME)
> item
> quantity
>
> *Hive*
> item
> price
>
> So we basically have incoming (ts, id, quantity) and we want to join it
> with the hive table to get the total price (price * quantity) got the
> current item.
>
> After this we want to create window aggregate on quantity*price windowed
> on timestamp (event time attribute).
>
> In any way we formulate this query we hit the following error:
> org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.
>
>  I understand that the problem is that we cannot join with the Hive table
> and still maintain the watermark/even time column. But why is this?
>
> In datastream world I would just simply assign Max watermark to my
> enrichment input and join outputs will get the ts of the input record. Can
> I achieve something similar in SQL/Table api?
>
> Thank you!
> Gyula
>
>


Re: Schema with TypeInformation or DataType

2020-04-16 Thread godfrey he
Hi tison,

>1. Will TypeInformation be deprecated and we use DataType as type system
everywhere?
AFAIK, runtime will still supports TypeInformation, while table module
supports DataType

> 2. Schema in Table API currently support only TypeInformation to register
a field, shall we support
the DataType way as well?
Schema also supports DataType since FLINK-14645[1]

[1] https://issues.apache.org/jira/browse/FLINK-14645

Best,
Godfrey

tison  于2020年4月17日周五 下午2:14写道:

> Hi,
>
> I notice that our type system has two branches. One  is TypeInformation
> while the other is
> DataType. It is said that Table API will use DataType but there are
> several questions about
> this statement:
>
> 1. Will TypeInformation be deprecated and we use DataType as type system
> everywhere?
> 2. Schema in Table API currently support only TypeInformation to register
> a field, shall we support
> the DataType way as well?
>
> Best,
> tison.
>


Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
Hi Flavio,

Thanks for the detailed explanation.

I think we should let Catalog know this concept first,
then TableEnvironment or SQL Gateway can do more stuff based on that.
But "trigger" is Database domain concept, I think it need more discuss
whether Flink should support this. also cc @bowenl...@gmail.com

Best,
Godfrey

Jeff Zhang  于2020年4月16日周四 下午10:54写道:

> Hi Flavio,
>
> If you would like to use have a UI to register data sources, run flink sql
> and preview the sql result, then you can use zeppelin directly. You can
> check the tutorial here,
> 1) Get started https://link.medium.com/oppqD6dIg5
> <https://t.co/PTouUYYTrv?amp=1> 2) Batch https://
> link.medium.com/3qumbwRIg5 <https://t.co/Yo9QAY0Joj?amp=1> 3) Streaming
> https://link.medium.com/RBHa2lTIg5 <https://t.co/sUapN40tvI?amp=1> 4)
> Advanced usage https://link.medium.com/CAekyoXIg5
> <https://t.co/MXolULmafZ?amp=1>
>
> And here's one article shared by someone else about how to use flink on
> zeppelin.
>
> https://medium.com/@abdelkrim.hadjidj/event-driven-supply-chain-for-crisis-with-flinksql-be80cb3ad4f9
>
> Besides that, Zeppelin provides rest api which you can use to integarte
> with other system, but it is not standard jdbc protocol.
> http://zeppelin.apache.org/docs/0.9.0-preview1/usage/rest_api/notebook.html
>
> And I am doing more improvement recently, I will reveal more details in
> next week's flink forward.
>
> https://www.flink-forward.org/sf-2020/conference-program#it%E2%80%99s-finally-here--python-on-flink---flink-on-zeppelin
>
>
>
>
>
> Flavio Pompermaier  于2020年4月16日周四 下午8:24写道:
>
>> Basically we want to give a UI to the user to register its data sources
>> (i.e. catalogs in the Flink world), preview them (SELECT * LIMIT 100 for
>> example) but, in the case of JDBC catalogs, also to see relationships and
>> triggers.
>> We don't want to reimplement the wheel so we would like to reuse and
>> contribute to Flink as much as possible (since then in the batch jobs we
>> use Flink and we don't like to do the same work twice..).
>> In this way we can contribute to Flink if something is missing in the SQL
>> Gateway. However I don't know how to extend the existing stuff (for example
>> if I want table relationships and triggers)..
>>
>> Best,
>> Flavio
>>
>> On Thu, Apr 16, 2020 at 1:38 PM godfrey he  wrote:
>>
>>> Hi Flavio,
>>>
>>> Since 1.11(master), Flink supports "CREATE CATALOG ..." [1],
>>> we can use this statement create catalog dynamically.
>>>
>>> Currently, Catalog[2] dose not supports any operations on TRIGGER.
>>> Flink can't also use such info now. What's your user scenario?
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-15349
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/catalogs.html#catalog-api
>>>
>>> Best,
>>> Godfrey
>>>
>>> Flavio Pompermaier  于2020年4月16日周四 下午6:16写道:
>>>
>>>> Hi Godfrey,
>>>> I'd like to use the SQL gateway as a data proxy in our architecture.
>>>> However, catalogs in our use case are not know at configuration time..
>>>> is there a way to permit to register a JDBC catalog (for example when I
>>>> want to connect to a Postgres database)?
>>>> What if I want to add SHOW TRIGGERS? Do you think it could be
>>>> interesting?
>>>>
>>>> On Thu, Apr 16, 2020 at 10:55 AM godfrey he 
>>>> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> We prose FLIP-91[1] to support SQL Gateway at the beginning of this
>>>>> year.
>>>>> After a long discussion, we reached an agreement that
>>>>> SQL Gateway is an eco-system under ververia as first step.[2]
>>>>> Which could help SQL Gateway move forward faster.
>>>>> Now we almost finish first version development, some users are trying
>>>>> it out.
>>>>> Any suggestions are welcome!
>>>>>
>>>>> [1]
>>>>> https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
>>>>> [2] https://github.com/ververica/flink-sql-gateway
>>>>>
>>>>> Best,
>>>>> Godfrey
>>>>>
>>>>> Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:
>>>>>
>>>>>> Hi Jeff,
>>>>>> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL
>>>>>> but since then no progress has been made on that point. Do you think that
>>>>>> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
>>>>>> moment?
>>>>>> Any chance that a Flink SQL Gateway could ever be developed? Is there
>>>>>> anybody interested in this?
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>> [1]
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>>>>>>
>>>>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: instance number of user defined function

2020-04-16 Thread godfrey he
Hi,

An UDTF will be wrapped into an operator, an operator instance will be
executed by a slot (or parallelism/thread) ,
About operator, task, slot, you can refer to [1] for more details.
A TM (a JVM process) may has multiple slots, that means a JVM process may
has multiple UDTF instances.
It's better to make sure your UDTF stateless, otherwise you should care
about thread-safe problem.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html#task-slots-and-resources

Best,
Godfrey



lec ssmi  于2020年4月16日周四 下午6:20写道:

> Hi:
>I always wonder how much instance has been initialized in the whole
> flink application.
>Suppose there is such a scenario:
>I have a  UDTF  called '*mongo_join'*  through  which the flink
> table can join with external different mongo table  according to the
> parameters passed in.
>So ,I have a sql table called*trade . *Throughout  all the
> pipeline, I  join the *trade *table with  *item, * And *payment. *The sql
> statement as bellows:
>
>   * create view  trade_payment as  select trade_id, payment_id
> from trade , lateral table (mongo_join('payment')) as T(payment_id);*
> *  create view trade_item as  select trade_id,item_id from trade ,
> , lateral table (mongo_join('item')) as T(payment_id); *
>
> As everyone thinks, I use  some *member variables* to store  the
> different MongoConnection  in the  instance of the UDTF.
> So , will there be concurrency problems?  And how are the instances of
> the function distributed?
>
>   Thanks!
>
>


Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
Hi Flavio,

Since 1.11(master), Flink supports "CREATE CATALOG ..." [1],
we can use this statement create catalog dynamically.

Currently, Catalog[2] dose not supports any operations on TRIGGER.
Flink can't also use such info now. What's your user scenario?

[1] https://issues.apache.org/jira/browse/FLINK-15349
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/catalogs.html#catalog-api

Best,
Godfrey

Flavio Pompermaier  于2020年4月16日周四 下午6:16写道:

> Hi Godfrey,
> I'd like to use the SQL gateway as a data proxy in our architecture.
> However, catalogs in our use case are not know at configuration time..
> is there a way to permit to register a JDBC catalog (for example when I
> want to connect to a Postgres database)?
> What if I want to add SHOW TRIGGERS? Do you think it could be interesting?
>
> On Thu, Apr 16, 2020 at 10:55 AM godfrey he  wrote:
>
>> Hi Flavio,
>>
>> We prose FLIP-91[1] to support SQL Gateway at the beginning of this year.
>> After a long discussion, we reached an agreement that
>> SQL Gateway is an eco-system under ververia as first step.[2]
>> Which could help SQL Gateway move forward faster.
>> Now we almost finish first version development, some users are trying it
>> out.
>> Any suggestions are welcome!
>>
>> [1]
>> https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
>> [2] https://github.com/ververica/flink-sql-gateway
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:
>>
>>> Hi Jeff,
>>> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL but
>>> since then no progress has been made on that point. Do you think that
>>> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
>>> moment?
>>> Any chance that a Flink SQL Gateway could ever be developed? Is there
>>> anybody interested in this?
>>>
>>> Best,
>>> Flavio
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>>>
>>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>


Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
Hi Flavio, that's great~

Best,
Godfrey

Flavio Pompermaier  于2020年4月16日周四 下午5:01写道:

> Great, I'm very interested in trying it out!
> Maybe we can also help with the development because we need something like
> that.
> Thanks a lot for the pointers
>
> On Thu, Apr 16, 2020 at 10:55 AM godfrey he  wrote:
>
>> Hi Flavio,
>>
>> We prose FLIP-91[1] to support SQL Gateway at the beginning of this year.
>> After a long discussion, we reached an agreement that
>> SQL Gateway is an eco-system under ververia as first step.[2]
>> Which could help SQL Gateway move forward faster.
>> Now we almost finish first version development, some users are trying it
>> out.
>> Any suggestions are welcome!
>>
>> [1]
>> https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
>> [2] https://github.com/ververica/flink-sql-gateway
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:
>>
>>> Hi Jeff,
>>> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL but
>>> since then no progress has been made on that point. Do you think that
>>> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
>>> moment?
>>> Any chance that a Flink SQL Gateway could ever be developed? Is there
>>> anybody interested in this?
>>>
>>> Best,
>>> Flavio
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>>>
>>


Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
Hi Flavio,

We prose FLIP-91[1] to support SQL Gateway at the beginning of this year.
After a long discussion, we reached an agreement that
SQL Gateway is an eco-system under ververia as first step.[2]
Which could help SQL Gateway move forward faster.
Now we almost finish first version development, some users are trying it
out.
Any suggestions are welcome!

[1]
https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
[2] https://github.com/ververica/flink-sql-gateway

Best,
Godfrey

Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:

> Hi Jeff,
> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL but
> since then no progress has been made on that point. Do you think that
> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
> moment?
> Any chance that a Flink SQL Gateway could ever be developed? Is there
> anybody interested in this?
>
> Best,
> Flavio
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>


Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-14 Thread godfrey he
Hi Jiahui,

Thanks for your suggestions.
I think we may need more detailed explanation about the behavior change.
Regarding to "supporting query configuration using Hints", I think it's a
one kind of approach, but we need more discussion.

Best,
Godfrey

Jiahui Jiang  于2020年4月14日周二 下午7:46写道:

> Yep yep :) I’m aware of the difference here for Blink and legacy Flink
> planner is only for sinks.
>
> But since on the API level toDataStream doesn’t take in a query level
> config, so it’s easy for people to think they can’t control it on a per
> query basis without digging into the source code.
>
> I have two questions / suggestions here:
>
> 1. Since StreamQueryConfig is deprecated and we want to consolidate config
> classes, can we maybe add an additional endpoint like
> .toRetractStream(Table, Class, minRetentionTime, maxRetentionTime)? Or at
> least add some Java docs so that I won’t worry about the behavior under the
> hook suddenly change?
> 2. What do we think about supporting query configuration using Hints to be
> a first class supported Flink feature?
>
> Thank you so much 😊
> --
> *From:* godfrey he 
> *Sent:* Tuesday, April 14, 2020 3:20 AM
> *To:* Jiahui Jiang 
> *Cc:* Jark Wu ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> I think this is the problem of multiple sinks optimization. If we optimize
> each sink eager (that means we optimize the query when we call
> `writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is
> functionally equivalent to QueryConfig.  which require we need
> call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or
> `insertInto`.  While, If we use multiple sinks optimization, It's hard to
> map the value of `TableConfig#setIdleStateRetentionTime` to each query. I
> think it's a common issue for configuring for per query on multiple sinks
> optimization.
>
> but for `toRetractStream` method, we keep eager optimization strategy. So
> you can call `TableConfig#setIdleStateRetentionTime` before
> `toRetractStream`.
>
> Best,
> Godfrey
>
> Jiahui Jiang  于2020年4月14日周二 下午12:15写道:
>
> Hey Godfrey, in some of the use cases our users have, they have a couple
> of complex join queries where the key domains key evolving - we definitely
> want some sort of state retention for those queries; but there are other
> where the key domain doesn't evolve overtime, but there isn't really a
> guarantee on what's the maximum gap between 2 records of the same key to
> appear in the stream, we don't want to accidentally invalidate the state
> for those keys in these streams.
>
> Because of queries with different requirements can both exist in the
> pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per
> operator.
>
> Just wondering, has similar requirement not come up much for SQL users
> before? (being able to set table / query configuration inside SQL queries)
>
> We are also a little bit concerned because right now since
> 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the
> fact that TableConfig is read during toDataStream feels like relying on an
> implementation details that just happens to work, and there is no guarantee
> that it will keep working in the future versions...
>
> Thanks!
> --
> *From:* godfrey he 
> *Sent:* Monday, April 13, 2020 9:51 PM
> *To:* Jiahui Jiang 
> *Cc:* Jark Wu ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> Query hint is a way for fine-grained configuration.
>  just out of curiosity, is it a strong requirement
>  that users need to config different IDLE_STATE_RETENTION_TIME for each
> operator?
>
> Best,
> Godfrey
>
> Jiahui Jiang  于2020年4月14日周二 上午2:07写道:
>
> Also for some more context, we are building a framework to help users
> build their Flink pipeline with SQL. Our framework handles all the setup
> and configuration, so that users only need to write the SQL queries without
> having to have any Flink knowledge.
>
> One issue we encountered was, for some of the streams, the key domain
> keeps evolving and we want to expire the states for older keys. But there
> is no easy ways to allow users configure their state timeout directly
> through SQL APIs.
> Currently we are asking users to configure idleStateRetentionTime in a
> custom SQL hint, then our framew

Re: Registering UDAF in blink batch app

2020-04-14 Thread godfrey he
Hi Dmytro,

Currently, TableEnvironment does not support register AggregationFunction
and TableFunction, because type extractor has not been unified for Java and
Scala.

One approach is we can use "TableEnvironment#createFunction" which will
register UDF to catalog.
I find "createTemporarySystemFunction" does not work now. cc @Zhenghua Gao


Best,
Godfrey

Zhenghua Gao  于2020年4月14日周二 下午6:40写道:

> `StreamTableEnvironment.create()` yields a `StreamTableEnvironmentImpl`
> object,
> which has several `registerFunction` interface for
> ScalarFunction/TableFunction/AggregateFunction/TableAggregateFunction.
>
> `TableEnvironment.create()` yields a `TableEnvironmentImpl` object, which
> is a unify entry point for Table/SQL programs.
> And it only has a deprecated `registerFunction` interface for
> ScalarFunction.  You should use `createTemporarySystemFunction` instead.
>
> A workaround for batch mode of blink planner is: You can use the public
> constructor of `StreamTableEnvironmentImpl` to create
> the TableEnvironment and use `registerFunction`s. Pls make sure you pass
> in the correct `isStreamingMode = false`
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Tue, Apr 14, 2020 at 5:58 PM Dmytro Dragan 
> wrote:
>
>> Hi All,
>>
>>
>>
>> Could you please tell how to register custom Aggregation function in
>> blink batch app?
>>
>> In case of streaming mode:
>>
>> We create
>>
>> EnvironmentSettings bsSettings = 
>> EnvironmentSettings.*newInstance*().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.*create*(env, 
>> bsSettings);
>>
>>
>>
>> Which has:
>>
>>  void registerFunction(String name, AggregateFunction
>> aggregateFunction);
>>
>>
>>
>> But in case of batchMode, we need to create TableEnvironment:
>>
>>
>>
>> EnvironmentSettings bsSettings = 
>> EnvironmentSettings.*newInstance*().useBlinkPlanner().inBatchMode().build();
>> tEnv = TableEnvironment.*create*(bsSettings);
>>
>>
>>
>> Which does not have this function to register AggregationFunction, only
>> Scalar one.
>>
>>
>>
>> Details: Flink 1.10, Java API
>>
>>
>>
>>
>>
>


Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-14 Thread godfrey he
Hi Jiahui,

I think this is the problem of multiple sinks optimization. If we optimize
each sink eager (that means we optimize the query when we call
`writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is
functionally equivalent to QueryConfig.  which require we need
call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or
`insertInto`.  While, If we use multiple sinks optimization, It's hard to
map the value of `TableConfig#setIdleStateRetentionTime` to each query. I
think it's a common issue for configuring for per query on multiple sinks
optimization.

but for `toRetractStream` method, we keep eager optimization strategy. So
you can call `TableConfig#setIdleStateRetentionTime` before
`toRetractStream`.

Best,
Godfrey

Jiahui Jiang  于2020年4月14日周二 下午12:15写道:

> Hey Godfrey, in some of the use cases our users have, they have a couple
> of complex join queries where the key domains key evolving - we definitely
> want some sort of state retention for those queries; but there are other
> where the key domain doesn't evolve overtime, but there isn't really a
> guarantee on what's the maximum gap between 2 records of the same key to
> appear in the stream, we don't want to accidentally invalidate the state
> for those keys in these streams.
>
> Because of queries with different requirements can both exist in the
> pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per
> operator.
>
> Just wondering, has similar requirement not come up much for SQL users
> before? (being able to set table / query configuration inside SQL queries)
>
> We are also a little bit concerned because right now since
> 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the
> fact that TableConfig is read during toDataStream feels like relying on an
> implementation details that just happens to work, and there is no guarantee
> that it will keep working in the future versions...
>
> Thanks!
> --
> *From:* godfrey he 
> *Sent:* Monday, April 13, 2020 9:51 PM
> *To:* Jiahui Jiang 
> *Cc:* Jark Wu ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> Query hint is a way for fine-grained configuration.
>  just out of curiosity, is it a strong requirement
>  that users need to config different IDLE_STATE_RETENTION_TIME for each
> operator?
>
> Best,
> Godfrey
>
> Jiahui Jiang  于2020年4月14日周二 上午2:07写道:
>
> Also for some more context, we are building a framework to help users
> build their Flink pipeline with SQL. Our framework handles all the setup
> and configuration, so that users only need to write the SQL queries without
> having to have any Flink knowledge.
>
> One issue we encountered was, for some of the streams, the key domain
> keeps evolving and we want to expire the states for older keys. But there
> is no easy ways to allow users configure their state timeout directly
> through SQL APIs.
> Currently we are asking users to configure idleStateRetentionTime in a
> custom SQL hint, then our framework will parse it and set it up during
> table registration time.
>
> An example query that users can be writing right now looks like,
>
> *CREATE TABLE *`/output` *AS*
>
> *SELECT **/*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */
> * *
>
> *FROM * `/input1` a
>
> INNER JOIN `/input2` b
>
> ON *a.column_name *=* b.column_name*;
>
> Is this something Flink SQL may want to support out of the box? (Starting
> from Calcite 1.22.0
> <https://calcite.apache.org/news/2020/03/05/release-1.22.0/>, it started
> to provide first class hint parsing)
>
>
> --
> *From:* Jiahui Jiang 
> *Sent:* Sunday, April 12, 2020 4:30 PM
> *To:* Jark Wu 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hey Jark, thank you so much for confirming!
>
> Out of curiosity, even though I agree that having too many config classes
> are confusing, not knowing when the config values are used during pipeline
> setup is also pretty confusing. For example, the name of 'TableConfig'
> makes me feel it's global to the whole tableEnvironment (which is true) but is
> only read once at execution (which is not true). Can we try to surface or
> add some documentation on when are these configs are read? 😄
>
> Thank you so much!
> --
> *From:* Jark Wu 
> *Sent:* Saturday, April 11, 2020 8:45 A

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-13 Thread godfrey he
Hi Jiahui,

Query hint is a way for fine-grained configuration.
 just out of curiosity, is it a strong requirement
 that users need to config different IDLE_STATE_RETENTION_TIME for each
operator?

Best,
Godfrey

Jiahui Jiang  于2020年4月14日周二 上午2:07写道:

> Also for some more context, we are building a framework to help users
> build their Flink pipeline with SQL. Our framework handles all the setup
> and configuration, so that users only need to write the SQL queries without
> having to have any Flink knowledge.
>
> One issue we encountered was, for some of the streams, the key domain
> keeps evolving and we want to expire the states for older keys. But there
> is no easy ways to allow users configure their state timeout directly
> through SQL APIs.
> Currently we are asking users to configure idleStateRetentionTime in a
> custom SQL hint, then our framework will parse it and set it up during
> table registration time.
>
> An example query that users can be writing right now looks like,
>
> *CREATE TABLE *`/output` *AS*
>
> *SELECT **/*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */
> **
>
> *FROM *`/input1` a
>
> INNER JOIN `/input2` b
>
> ON *a.column_name *=* b.column_name*;
>
> Is this something Flink SQL may want to support out of the box? (Starting
> from Calcite 1.22.0
> , it started
> to provide first class hint parsing)
>
>
> --
> *From:* Jiahui Jiang 
> *Sent:* Sunday, April 12, 2020 4:30 PM
> *To:* Jark Wu 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hey Jark, thank you so much for confirming!
>
> Out of curiosity, even though I agree that having too many config classes
> are confusing, not knowing when the config values are used during pipeline
> setup is also pretty confusing. For example, the name of 'TableConfig'
> makes me feel it's global to the whole tableEnvironment (which is true) but is
> only read once at execution (which is not true). Can we try to surface or
> add some documentation on when are these configs are read? 😄
>
> Thank you so much!
> --
> *From:* Jark Wu 
> *Sent:* Saturday, April 11, 2020 8:45 AM
> *To:* Jiahui Jiang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Yes, that's right. Set idleStateRetentionTime on TableConfig before
> translation should work.
>
> On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang 
> wrote:
>
> Thank you for answering! I was reading
> StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when
> trying to convert tables to DataStreams, planner.translate is taking the
> current tableConfig into account (aa in it reads the current tableConfig
> content even though it’s not explicitly passed in as an argument for
> translate). So seems like if I set tableConfig right before converting to
> DataStreams that should work?
>
> Or did you mean the actual tableEnvironment.execute()? Since we have a
> whole pipeline with multiple queries that also depends on each other. We
> have to have all the continuous queries executing concurrently.
>
> Thanks again!
> --
> *From:* Jark Wu 
> *Sent:* Saturday, April 11, 2020 1:24 AM
> *To:* Jiahui Jiang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> QueryConfig is deprecated and will be removed in the future, because it is
> confusing that TableAPI has so many different config classes.
> If you want to set different idleStateRetentionTime for different queries,
> you can set a new idleStateRetentionTime on TableConfig before
> execute/submit the query.
>
> Best,
> Jark
>
> On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang 
> wrote:
>
> Just looked into the source code a bit further and realized that for
> StreamTableEnvironmentImpl, even for sinks it's also doing translation
> lazily. Any way we can have different transformation to have different
> queryConfig?
> --
> *From:* Jiahui Jiang 
> *Sent:* Friday, April 10, 2020 6:46 PM
> *To:* user@flink.apache.org 
> *Subject:* Setting different idleStateRetentionTime for different queries
> executed in the same TableEnvironment in Flink 1.10
>
> Hello! I'm using Table API to write a pipeline with multiple queries. And
> I want to set up different idleStateRetentionTime for different queries.
>
> In Flink 1.8, it seems to be the case where I can pass in a
> streamQueryConfig when converting each output table into datastreams. And
> the translate with take the idleStateRetentionTime into account.
>
> But in Flink 1.10, that idleStateRetentionTime actually gets set on
> TableConfig and applies to the tableEnviro

Re: Multiple SQL Optimization

2020-04-10 Thread godfrey he
Hi forideal,
 Currently, Blink planner with TableEnvironment supports multiple sinks
optimization which will try best to reuse common sub-graph.

Best,
Godfrey

forideal  于2020年4月10日周五 下午4:31写道:

> Hello
>
>There are 3 SQLs all querying the same table, but the generated GAG is
> 3 independent topologies.I think, the better result is that there is one
> Source and 3 Sinks.
>
>
>   create table good_sink (data varchar) with (
>   'connector.type' = 'console',
>   'connector.dry-run' = 'false',
>   'connector.property-version' = '1',
>   'update-mode' = 'append');create table atomic_sink (data varchar) with (
>   'connector.type' = 'console',
>   'connector.dry-run' = 'false',
>   'connector.property-version' = '1',
>   'update-mode' = 'append');create table bad_sink (data varchar) with (
>   'connector.type' = 'console',
>   'connector.dry-run' = 'false',
>   'connector.property-version' = '1',
>   'update-mode' = 'append');create table source_stream (data varchar, `key` 
> varchar) with (
>  xxx);insert into
>   good_sinkselect
>   datafrom
>   source_streamwhere
>   `key` = 'good';insert into
>   atomic_sinkselect
>   datafrom
>   source_streamwhere
>   `key` = 'atomic';insert into
>   atomic_sinkselect
>   datafrom
>   source_streamwhere
>   `key` = 'bad';
>
> DAG picture Link:
> https://pic4.zhimg.com/80/v2-7db1417bd2607d3a939f38cc19228df3_1440w.jpg
> Question Link:https://zhuanlan.zhihu.com/p/128590984
>
> Best Wishes
>
>
>
>


Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread godfrey he
I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead of
`flink-connector-kafka_2.11`.

Bests,
Godfrey

kant kodali  于2020年3月1日周日 下午5:15写道:

> The dependency was already there. Below is my build.gradle. Also I checked
> the kafka version and looks like the jar
>
> flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>
> downloads kafka-clients version 2.2.0. So I changed my code to version
> 2.2.0 and same problem persists.
>
> buildscript {
> repositories {
> jcenter() // this applies only to the Gradle 'Shadow' plugin
> }
> dependencies {
> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
> }
> }
>
> plugins {
> id 'java'
> id 'application'
> }
>
> mainClassName = 'Test'
> apply plugin: 'com.github.johnrengelman.shadow'
>
> ext {
> javaVersion = '1.8'
> flinkVersion = '1.10.0'
> scalaBinaryVersion = '2.11'
> slf4jVersion = '1.7.7'
> log4jVersion = '1.2.17'
> }
>
>
> sourceCompatibility = javaVersion
> targetCompatibility = javaVersion
> tasks.withType(JavaCompile) {
> options.encoding = 'UTF-8'
> }
>
> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
>
> // declare where to find the dependencies of your project
> repositories {
> mavenCentral()
> maven { url 
> "https://repository.apache.org/content/repositories/snapshots/"; }
> }
>
> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then we 
> could not run code
> // in the IDE or with "gradle run". We also cannot exclude transitive 
> dependencies from the
> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
> // -> Explicitly define the // libraries we want to be included in the 
> "flinkShadowJar" configuration!
> configurations {
> flinkShadowJar // dependencies which go into the shadowJar
>
> // always exclude these (also from transitive dependencies) since they 
> are provided by Flink
> flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
> flinkShadowJar.exclude group: 'org.slf4j'
> flinkShadowJar.exclude group: 'log4j'
> }
>
> // declare the dependencies for your production and test code
> dependencies {
> // --
> // Compile-time dependencies that should NOT be part of the
> // shadow jar and are provided in the lib folder of Flink
> // --
> compile "org.apache.flink:flink-java:${flinkVersion}"
> compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>
> // --
> // Dependencies that should be part of the shadow jar, e.g.
> // connectors. These must be in the flinkShadowJar configuration!
> // --
>
> compile "org.apache.flink:flink-java:${flinkVersion}"
> compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
> flinkShadowJar "org.apache.flink:flink-csv:${flinkVersion}"
>
> flinkShadowJar 
> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
> compileOnly 
> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
> compileOnly 
> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
> flinkShadowJar 
> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
> flinkShadowJar 
> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"
>
>
> compile "log4j:log4j:${log4jVersion}"
> compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>
> // Add test dependencies here.
> // testCompile "junit:junit:4.12"
> testImplementation 
> "org.apache.flink:flink-table-api-java-bridge_2.11:${flinkVersion}"
> testImplementation 
> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
> }
>
> // make compileOnly dependencies available for tests:
> sourceSets {
> main.compileClassp

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread godfrey he
hi kant,

> Also why do I need to convert to DataStream to print the rows of a table?
Why not have a print method in the Table itself?
Flink 1.10 introduces a utility class named TableUtils to convert a Table
to List, this utility class is mainly used for demonstration or
testing and is only applicable for *small batch jobs* and small finite *append
only stream jobs*.  code like:
Table table = tEnv.sqlQuery("select ...");
List result = TableUtils.collectToList(table);
result.

currently, we are planner to implement Table#collect[1], after
that Table#head and Table#print may be also introduced soon.

>  The program finished with the following exception:
please make sure that the kafka version in Test class and the kafka version
in pom dependency are same. I tested your code successfully.

Bests,
Godfrey

[1] https://issues.apache.org/jira/browse/FLINK-14807


Benchao Li  于2020年3月1日周日 下午4:44写道:

> Hi kant,
>
> CSV format is an independent module, you need to add it as your
> dependency.
>
> 
>org.apache.flink
>flink-csv
>${flink.version}
> 
>
>
> kant kodali  于2020年3月1日周日 下午3:43写道:
>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: findAndCreateTableSource failed.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.TableException:
>> findAndCreateTableSource failed.
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>> at Test.main(Test.java:34)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>> ... 8 more
>> Caused b

Re: Does Flink 1.9 support create or replace views syntax in raw sql?

2020-02-27 Thread godfrey he
HI kant,

> 1) Where does the hive catalog persist view definitions? in mysql? or
HDFS?
hive catalog stores all metadata in derby or mysql [1]

> 2) If the views are not persisted what happens if the application crashes
and restarted? will it create the view again and safely read the data from
where it left off?
as Jark said, until to latest release version, the views are stored in
memory. So if the application crashes or is restarted, you must create the
view again.


[1] https://data-flair.training/blogs/apache-hive-metastore/

Bests,
Godfrey



kant kodali  于2020年2月27日周四 下午9:25写道:

>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html#hivecatalog
>
> Can I use the hive catalog to store view definitions in HDFS? I am
> assuming the metastore can be anything or does it have to be have MySQL?
>
> On Thu, Feb 27, 2020 at 4:46 AM kant kodali  wrote:
>
>> Hi,
>>
>> 1) Where does the hive catalog persist view definitions? in mysql? or
>> HDFS?
>>
>> 2) If the views are not persisted what happens if the application crashes
>> and restarted? will it create the view again and safely read the data from
>> where it left off?
>>
>> Thanks!
>>
>> On Wed, Feb 26, 2020 at 6:47 AM godfrey he  wrote:
>>
>>> Hi Kant, if you want the store the catalog data in Local
>>> Filesystem/HDFS, you can implement a user defined catalog (just need to
>>> implement Catalog interface)
>>>
>>> Bests,
>>> Godfrey
>>>
>>> kant kodali  于2020年2月26日周三 下午12:28写道:
>>>
>>>> Hi Jingsong,
>>>>
>>>> Can I store it in Local Filesystem/HDFS?
>>>>
>>>> Thanks!
>>>>
>>>> On Mon, Jan 20, 2020 at 6:59 PM Jingsong Li 
>>>> wrote:
>>>>
>>>>> Hi Kant,
>>>>>
>>>>> If you want your view persisted, you must to dock a catalog like hive
>>>>> catalog, it stores views in the metastore with mysql.
>>>>> - In 1.10, you can store views in catalog through
>>>>> "Catalog.createTable", you can create a "CatalogViewImpl". This is an
>>>>> internal API, which is not easy to use.
>>>>> - In 1.11, we will introduce create view DDL for "TableEnv.sqlUpdate"
>>>>> and "TableEnv.createView". It will be easy to use.
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Tue, Jan 21, 2020 at 10:03 AM Jark Wu  wrote:
>>>>>
>>>>>> Hi Kant,
>>>>>>
>>>>>> The TableEnv#createTemporaryView and CREATE VIEW in SQL Cli both
>>>>>> creates temporary views which is not persisted and will lost after 
>>>>>> session
>>>>>> close.
>>>>>> I think the persisted views will be supported in 1.11.
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> 2020年1月20日 18:46,kant kodali  写道:
>>>>>>
>>>>>> Hi Jingsong,
>>>>>>
>>>>>> Thanks a lot, I think I can live with
>>>>>> TableEnvironment.createTemporaryView in Flink 1.10 (which I am expecting 
>>>>>> to
>>>>>> be released this month) but are these views persisted somewhere? for
>>>>>> example across sessions? or say I stop my application and start again 
>>>>>> will
>>>>>> it work as expected?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 20, 2020 at 1:12 AM Jingsong Li 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Kant,
>>>>>>>
>>>>>>> Sorry, 1.10 not support "CREATE VIEW" in raw SQL too. Workaround is:
>>>>>>> - Using TableEnvironment.createTemporaryView...
>>>>>>> - Or using "create view" and "drop view" in the sql-client.
>>>>>>> - Or using hive catalog, in 1.10, we support query catalog views.
>>>>>>>
>>>>>>> FLIP-71 will be finished  in 1.11 soon.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong Lee
>>>>>>>
>>>>>>> On Sun, Jan 19, 2020 at 4:10 PM kant kodali 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I tried the following.
>>>>>>>>
>>>>>>>> bsTableEnv.sqlUpdate("CREATE VIEW my_view AS SELECT * FROM sample1 
>>>>>>>> FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");
>>>>>>>>
>>>>>>>> Table result = bsTableEnv.sqlQuery("select * from my_view");
>>>>>>>>
>>>>>>>> It looks like
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-71+-+E2E+View+support+in+FLINK+SQL
>>>>>>>>  Views
>>>>>>>> are not supported. Can I expect them to be supported in Flink 1.10?
>>>>>>>>
>>>>>>>> Currently, with Spark SQL when the query gets big I break it down
>>>>>>>> into views and this is one of the most important features my 
>>>>>>>> application
>>>>>>>> relies on. is there any workaround for this at the moment?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> On Sat, Jan 18, 2020 at 6:24 PM kant kodali 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> Does Flink 1.9 support create or replace views syntax in raw SQL?
>>>>>>>>> like spark streaming does?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>


Re: Does Flink 1.9 support create or replace views syntax in raw sql?

2020-02-26 Thread godfrey he
Hi Kant, if you want the store the catalog data in Local Filesystem/HDFS,
you can implement a user defined catalog (just need to implement Catalog
interface)

Bests,
Godfrey

kant kodali  于2020年2月26日周三 下午12:28写道:

> Hi Jingsong,
>
> Can I store it in Local Filesystem/HDFS?
>
> Thanks!
>
> On Mon, Jan 20, 2020 at 6:59 PM Jingsong Li 
> wrote:
>
>> Hi Kant,
>>
>> If you want your view persisted, you must to dock a catalog like hive
>> catalog, it stores views in the metastore with mysql.
>> - In 1.10, you can store views in catalog through "Catalog.createTable",
>> you can create a "CatalogViewImpl". This is an internal API, which is not
>> easy to use.
>> - In 1.11, we will introduce create view DDL for "TableEnv.sqlUpdate"
>> and "TableEnv.createView". It will be easy to use.
>>
>> Best,
>> Jingsong Lee
>>
>> On Tue, Jan 21, 2020 at 10:03 AM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> The TableEnv#createTemporaryView and CREATE VIEW in SQL Cli both creates
>>> temporary views which is not persisted and will lost after session close.
>>> I think the persisted views will be supported in 1.11.
>>>
>>> Best,
>>> Jark
>>>
>>> 2020年1月20日 18:46,kant kodali  写道:
>>>
>>> Hi Jingsong,
>>>
>>> Thanks a lot, I think I can live with
>>> TableEnvironment.createTemporaryView in Flink 1.10 (which I am expecting to
>>> be released this month) but are these views persisted somewhere? for
>>> example across sessions? or say I stop my application and start again will
>>> it work as expected?
>>>
>>> Thanks!
>>>
>>>
>>> On Mon, Jan 20, 2020 at 1:12 AM Jingsong Li 
>>> wrote:
>>>
 Hi Kant,

 Sorry, 1.10 not support "CREATE VIEW" in raw SQL too. Workaround is:
 - Using TableEnvironment.createTemporaryView...
 - Or using "create view" and "drop view" in the sql-client.
 - Or using hive catalog, in 1.10, we support query catalog views.

 FLIP-71 will be finished  in 1.11 soon.

 Best,
 Jingsong Lee

 On Sun, Jan 19, 2020 at 4:10 PM kant kodali  wrote:

> I tried the following.
>
> bsTableEnv.sqlUpdate("CREATE VIEW my_view AS SELECT * FROM sample1 FULL 
> OUTER JOIN sample2 on sample1.f0=sample2.f0");
>
> Table result = bsTableEnv.sqlQuery("select * from my_view");
>
> It looks like
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-71+-+E2E+View+support+in+FLINK+SQL
>  Views
> are not supported. Can I expect them to be supported in Flink 1.10?
>
> Currently, with Spark SQL when the query gets big I break it down into
> views and this is one of the most important features my application relies
> on. is there any workaround for this at the moment?
>
> Thanks!
>
> On Sat, Jan 18, 2020 at 6:24 PM kant kodali 
> wrote:
>
>> Hi All,
>>
>> Does Flink 1.9 support create or replace views syntax in raw SQL?
>> like spark streaming does?
>>
>> Thanks!
>>
>

 --
 Best, Jingsong Lee

>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

2020-02-25 Thread godfrey he
hi, I find that JsonRowDeserializationSchema only supports date-time with
timezone according to RFC 3339. So you need add timezone to time data (like
14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope it can help
you.

Bests,
godfrey

Outlook  于2020年2月25日周二 下午5:49写道:

> By the way, my flink version is 1.10.0.
>
>  Original Message
> *Sender:* Outlook
> *Recipient:* user
> *Date:* Tuesday, Feb 25, 2020 17:43
> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API
>
> Hi all,
>
> I read json data from kafka, and print to console. When I do this, some
> error occurs when time/timestamp deserialization.
>
> json data in Kafka:
>
> ```
> {
> "server_date": "2019-07-09",
> "server_time": "14:02:00",
> "reqsndtime_c": "2019-07-09 02:02:00.040"
> }
> ```
>
> flink code:
>
> ```
> bsTableEnv.connect(
> new Kafka()
> .version("universal")
> .topic("xxx")
> .property("bootstrap.servers", "localhost:9092")
> .property("zookeeper.connect", "localhost:2181")
> .property("group.id", "g1")
> .startFromEarliest()
> ).withFormat(
> new Json()
> .failOnMissingField(false)
> ).withSchema(
> new Schema()
> .field("server_date", DataTypes.DATE())
> .field("server_time", DataTypes.TIME())
> .field("reqsndtime_c", DataTypes.TIMESTAMP(3))
> ).inAppendMode()
> .createTemporaryTable("xxx”);
> ```
>
>
> server_date with format  is ok, but server_time with  DataTypes.DATE()
> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error.  If I change them
> to DataTypes.STRING(), everything will be OK.
>
> Error message:
> ```
> Exception in thread "main" java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at
> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> at cn.com.agree.Main.main(Main.java:122)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTas

Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-20 Thread godfrey he
Congrats Jingsong! Well deserved.

Best,
godfrey

Jeff Zhang  于2020年2月21日周五 上午11:49写道:

> Congratulations!Jingsong. You deserve it
>
> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
>
>> Congrats Jingsong!
>>
>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
>>
>> > Congrats Jingsong!
>> >
>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
>> > >
>> > > Congratulations Jingsong! Well deserved.
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
>> > >
>> > >> Congratulations! Jingsong
>> > >>
>> > >>
>> > >> Best,
>> > >> Dan Zou
>> > >>
>> >
>> >
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread godfrey he
Congrats to everyone involved! Thanks, Yu & Gary.

Best,
godfrey

Yu Li  于2020年2月13日周四 下午12:57写道:

> Hi Kristoff,
>
> Thanks for the question.
>
> About Java 11 support, please allow me to quote from our release note [1]:
>
> Lastly, note that the connectors for Cassandra, Hive, HBase, and Kafka
> 0.8–0.11
> have not been tested with Java 11 because the respective projects did not
> provide
> Java 11 support at the time of the Flink 1.10.0 release
>
> Which is the main reason for us to still make our docker image based on
> JDK 8.
>
> Hope this answers your question.
>
> Best Regards,
> Yu
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html
>
>
> On Wed, 12 Feb 2020 at 23:43, KristoffSC 
> wrote:
>
>> Hi all,
>> I have a small question regarding 1.10
>>
>> Correct me if I'm wrong, but 1.10 should support Java 11 right?
>>
>> If so, then I noticed that docker images [1] referenced in [2] are still
>> based on openjdk8 not Java 11.
>>
>> Whats up with that?
>>
>> P.S.
>> Congrats on releasing 1.10 ;)
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/release-1.10/flink-container/docker/Dockerfile
>> [2]
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: some basic questions

2020-01-18 Thread godfrey he
hi kant,
"FULL OUTER JOIN" job will generate retract message, so toRetractStream is
required to guarantee the correctness.
I think it's better to use StreamExecutionEnvrionment.execute, because you
have converted the Table to DataStream.

kant kodali  于2020年1月19日周日 上午11:59写道:

> Hi Godfrey,
>
> I was just clicking the run button on my IDE and it doesn't really show me
> errors so I used command line fink run  and that shows me what the
> error is. It tells me I need to change to toRetractStream() and both
> StreamExecutionEnvrionment and StreamTableEnvrionment .execute seems to
> work fine although I am not sure which one is the correct usage.
>
> Thanks!
>
> On Sat, Jan 18, 2020 at 6:52 PM kant kodali  wrote:
>
>> Hi Godfrey,
>>
>> Thanks a lot for your response. I just tried it with env.execute("simple
>> job") but I still get the same error message.
>>
>> Kant
>>
>> On Sat, Jan 18, 2020 at 6:26 PM godfrey he  wrote:
>>
>>> hi kant,
>>>
>>> > 1) The Documentation says full outer join is supported however the
>>> below code just exits with value 1. No error message.
>>> if you have converted Table to DataStream, please execute it
>>> with StreamExecutionEnvironment ( call env.execute("simple job") )
>>>
>>> > 2) If I am using a blink planner should I use TableEnvironment or
>>> StreamTableEnvironment ?
>>> for streaming job, both Environment can be used. the difference is:
>>>   TableEnvironment will optimize multiple queries into one DAG when
>>> executing, while StreamTableEnvironment will independent optimize each
>>> query.
>>>   StreamTableEnvironment supports convert from/to DataStream,
>>> while TableEnvironment does not support it.
>>>   StreamTableEnvironment supports register TableFunction
>>> and AggregateFunction, while TableEnvironment does not support it now.
>>>
>>> for batch job, only TableEnvironment is the only choice, because
>>> DataStream does not support batch job now.
>>>
>>> > 3) Why flink current stable documentation(1.9) recommends (old
>>> planner)? any rough timeline on when we would be able to use blink planner
>>> in production? perhaps 1.10 or 1.11?
>>> 1.9 is blink planner's first version, and it is unstable. In 1.10, blink
>>> planner is more statable, we are switching the blink planner to the default
>>> step by step [0].
>>>
>>> [0]
>>> http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E
>>>
>>> kant kodali  于2020年1月18日周六 下午5:40写道:
>>>
>>>> Hi All,
>>>>
>>>> 1) The Documentation says full outer join is supported however the
>>>> below code just exits with value 1. No error message.
>>>>
>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>> import 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>>>> import org.apache.flink.table.api.*;
>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>> import org.apache.flink.types.Row;
>>>>
>>>> import java.util.Properties;
>>>>
>>>> public class Test {
>>>>
>>>> public static void main(String... args) throws Exception {
>>>>
>>>> EnvironmentSettings bsSettings = 
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>> final StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> StreamTableEnvironment bsTableEnv = 
>>>> StreamTableEnvironment.create(env, bsSettings);
>>>>
>>>> Properties properties = new Properties();
>>>> properties.setProperty("bootstrap.servers", "localhost:9092");
>>>> properties.setProperty("group.id", "test");
>>>>
>>>> FlinkKafkaConsumer consumer1 = new FlinkKafkaConsumer<>(
>>>> java.util.regex.Pattern.compile("test-topic1"),
>>>> new SimpleStringSchema(),
>>>> properties);
>>>> FlinkKafkaConsumer consum

Re: some basic questions

2020-01-18 Thread godfrey he
hi kant,

> 1) The Documentation says full outer join is supported however the below
code just exits with value 1. No error message.
if you have converted Table to DataStream, please execute it
with StreamExecutionEnvironment ( call env.execute("simple job") )

> 2) If I am using a blink planner should I use TableEnvironment or
StreamTableEnvironment ?
for streaming job, both Environment can be used. the difference is:
  TableEnvironment will optimize multiple queries into one DAG when
executing, while StreamTableEnvironment will independent optimize each
query.
  StreamTableEnvironment supports convert from/to DataStream,
while TableEnvironment does not support it.
  StreamTableEnvironment supports register TableFunction
and AggregateFunction, while TableEnvironment does not support it now.

for batch job, only TableEnvironment is the only choice, because DataStream
does not support batch job now.

> 3) Why flink current stable documentation(1.9) recommends (old planner)?
any rough timeline on when we would be able to use blink planner in
production? perhaps 1.10 or 1.11?
1.9 is blink planner's first version, and it is unstable. In 1.10, blink
planner is more statable, we are switching the blink planner to the default
step by step [0].

[0]
http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E

kant kodali  于2020年1月18日周六 下午5:40写道:

> Hi All,
>
> 1) The Documentation says full outer join is supported however the below
> code just exits with value 1. No error message.
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.table.api.*;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> import java.util.Properties;
>
> public class Test {
>
> public static void main(String... args) throws Exception {
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(env, bsSettings);
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "test");
>
> FlinkKafkaConsumer consumer1 = new FlinkKafkaConsumer<>(
> java.util.regex.Pattern.compile("test-topic1"),
> new SimpleStringSchema(),
> properties);
> FlinkKafkaConsumer consumer2 = new FlinkKafkaConsumer<>(
> java.util.regex.Pattern.compile("test-topic2"),
> new SimpleStringSchema(),
> properties);
>
> DataStream stream1 = env.addSource(consumer1);
> DataStream stream2 = env.addSource(consumer2);
>
> bsTableEnv.registerDataStream("sample1", stream1);
> bsTableEnv.registerDataStream("sample2", stream2);
>
> Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER 
> JOIN sample2 on sample1.f0=sample2.f0");
> result.printSchema();
>
> bsTableEnv.toAppendStream(result, Row.class).print();
> bsTableEnv.execute("sample job");
> }
> }
>
>
> 2) If I am using a blink planner should I use TableEnvironment or
> StreamTableEnvironment ?
>
> 3) Why flink current stable documentation(1.9) recommends (old planner)?
> any rough timeline on when we would be able to use blink planner in
> production? perhaps 1.10 or 1.11?
>
> Thanks!
>
>
>


Re: Null result cannot be used for atomic types

2020-01-09 Thread godfrey he
hi sunfulin,

which flink version are you using ?

best,
godfrey

sunfulin  于2020年1月10日周五 下午1:50写道:

> Hi, I am running a Flink app while reading Kafka records with JSON format.
> And the connect code is like the following:
>
>
> tableEnv.connect(
>
> new Kafka()
>
> .version(kafkaInstance.getVersion())
>
> .topic(chooseKafkaTopic(initPack.clusterMode))
>
> .property("bootstrap.servers",
> kafkaInstance.getBrokerList())
>
> .property("group.id", initPack.jobName)
>
> .startFromEarliest()
>
> ).withSchema(
>
> new Schema()
>
> // EVENT_TIME
>
> .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>
> new Rowtime()
>
> .timestampsFromField("time")
>
> .watermarksPeriodicBounded(1000)
>
> )
>
> .field("type", Types.STRING)
>
> .field("event", Types.STRING)
>
> .field("user_id", Types.STRING)
>
> .field("distinct_id", Types.STRING)
>
> .field("project", Types.STRING)
>
> .field("recv_time", Types.SQL_TIMESTAMP)
>
> .field("properties", Types.ROW_NAMED(
>
> new String[] { "BROWSER_VERSION", "pathname",
> "search", "eventType", "message", "stack", "componentStack" },
>
> Types.STRING, Types.STRING, Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.STRING)
>
> )
>
> ).withFormat(
>
> new Json().failOnMissingField(false)
>
> .deriveSchema()
>
> )
>
> .inAppendMode()
>
> .registerTableSource(getTableName());
>
>
>
> However, the application throws the following Exception which really
> confused me. From the code above, the field types are only *Types.STRING*
> or *Types.SQL_TIMESTAMP. *
>
> *Not sure which data field can run to this. Wanner some help from
> community.*
>
>
> Caused by: java.lang.NullPointerException: Null result cannot be used for
> atomic types.
>
>  at DataStreamSinkConversion$5.map(Unknown Source)
>
>  at
> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
>
>  at
> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
>
>  at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>
>  at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>
>  at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>
>  at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>
>  at DataStreamSourceConversion$2.processElement(Unknown Source)
>
>  at
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
>
>  at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
>  at org.apache.flink.streaming.
>
>
>
>
>