Re: Upgrade calcite version

2021-03-10 Thread Danny Chan
Hi Sheng ~

It is a hard work to upgrade Calcite version because that means you need to:

- Fix all the bug introduced by the new planner
- Fix all the logical plan to have correct semantics
- Replace the deprecate APIs to new

In order to achieve this, you need to have good knowledge of Calcite basic
and SQL planning (the relational algebra) that really few people have,
so i would not suggest to do that at all for yourself.

What is the purpose to must upgrade Calcite, can you share something ~

Best,
Danny Chan


盛森林  于2021年2月4日周四 下午10:47写道:

> Hi,
> I want to upgrade calcite to 1.22 in the flink branch that fork from
> apache release 1.9. 
> Can community give me some suggestion.


Re: Flink reads data from JDBC table only on startup

2020-12-30 Thread Danny Chan
For your case, you should use a temporal table join syntax, and set up a
refresh TTL for the RHS join cache.

Taras Moisiuk  于2020年12月28日周一 下午7:21写道:

> Hi Danny,
>
> I use regular join and it looks like:
>
> SELECT
> ...
> FROM *dynamic_kafka_table* k
> JOIN *jdbc_table* j ON k.id = j.k_id
>
>
> Should I set some additional conditions for this join?
>
> --
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

2020-12-27 Thread Danny Chan
> SQL parse failed. Encount
What syntax did you use ?

> TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain RAW type
back to TypeInformation.

Did you try to construct type information by a new
fresh TypeInformationRawType ?

Yuval Itzchakov  于2020年12月24日周四 下午7:24写道:

> An expansion to my question:
>
> What I really want is for the UDF to return `RAW(io.circe.Json, ?)` type,
> but I have to do a conversion between Table and DataStream, and
> TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain RAW type
> back to TypeInformation.
>
> On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov 
> wrote:
>
>> Hi,
>>
>> I have a UDF which returns a type of MAP> 'ANY')>. When I try to register this type with Flink via the
>> CREATE TABLE DDL, I encounter an exception:
>>
>> - SQL parse failed. Encountered "(" at line 2, column 256.
>> Was expecting one of:
>> "NOT" ...
>> "NULL" ...
>> ">" ...
>> "MULTISET" ...
>> "ARRAY" ...
>> "." ...
>>
>> Which looks like the planner doesn't like the round brackets on the
>> LEGACY type. What is the correct way to register the table with this type
>> with Flink?
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Flink reads data from JDBC table only on startup

2020-12-27 Thread Danny Chan
Hi Taras ~

There is a look up cache for temporal join but it default is false, see
[1]. That means, by default FLINK SQL would lookup the external databases
on each record from the JOIN LHS.

Did you use the temporal table join syntax or normal stream-stream join
syntax ? The temporal table join uses the SYSTEM_TIME AS OF keywords, see
[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#lookup-cache
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

Taras Moisiuk  于2020年12月27日周日 上午3:13写道:

> Hi everyone!
> I'm using Flink 1.12.0 with SQL API.
>
> I'm developing a streaming job with join and insertion into postgreSQL.
> There is two tables in join:
> 1. Dynamic table based on kafka topic
> 2. Small lookup JDBC table
>
> From what I can see Flink job reads data from JDBC table only on startup
> and
> mark task as FINISHED.
> Does it mean that Flink misses all updates from this table and join
> reflects
> only table state on startup?
>
> And the other question is, how to enable checkpointing for this job? I know
> that checkpointing for jobs with finished tasks is not supported now, but
> maybe I can keep such tasks in RUNNING state?
>
> Thank you!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Long latency when consuming a message from KAFKA and checkpoint is enabled

2020-12-27 Thread Danny Chan
Hi, Nick ~
The behavior is as expected, because Kafka source/sink relies on the
Checkpoints to complement the exactly-once write semantics, a checkpoint
snapshot the states on a time point which is used for recovering, the
current internals for Kafka sink is that it writes to Kafka but only
commits it when a checkpoint completes.

For your needs, i guess you want a more near-real-time write but still keep
the exactly once semantics, i'm sorry to tell that there is no other
infrastructure
that we can use for exactly-once semantics except for the checkpoints.

nick toker  于2020年12月27日周日 下午3:12写道:

> Hi
>
> any idea?
> is it a bug?
>
>
> regards'
> nick
>
> ‫בתאריך יום ד׳, 23 בדצמ׳ 2020 ב-11:10 מאת ‪nick toker‬‏ <‪
> nick.toker@gmail.com‬‏>:‬
>
>> Hello
>>
>> We noticed the following behavior:
>> If we enable the flink checkpoints, we saw that there is a delay between
>> the time we write a message to the KAFKA topic and the time the flink kafka
>> connector consumes this message.
>> The delay is closely related to checkpointInterval and/or
>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>> message from KAFKA will be one of these parameters
>>
>> If we disable the checkpoints, the message is immediately consumed
>> We work with the EXACTLY_ONCE semantic
>> Please note that we inject only one message
>>
>> Could you please advise how we can remove/control this delay?
>>
>> Please see the attached code of AbstractFetcher and KafkaFetcher (as a
>> png file)
>> (For example emitRecordsWithTimestamps() use a lock on checkpointLock).
>> Could this explain the behaviour ?
>>
>>
>> BR
>>
>


Re: NullPointerException while calling TableEnvironment.sqlQuery in Flink 1.12

2020-12-21 Thread Danny Chan
Hi Yuval Itzchakov ~

The thread you paste has a different stake trace with your case.

In the pasted thread, the JaninoRelMetadataProvider was missed because we
only set it once in a thread local variable, when the RelMetadataQuery was
used in a different working thread, the JaninoRelMetadataProvider caused an
NPE.

For your case, based on the stack trace, this line throws ~

RelMetadataQuery line 114:

super(null);

But actually this line allows an empty argument and it should not throw.

Can you give a re-producecable case here so that we can debug and find more
evidence ?

Yuval Itzchakov  于2020年12月22日周二 上午1:52写道:

> Hi,
>
> While trying to execute a query via TableEnvironment.sqlQuery in Flink
> 1.12, I receive the following exception:
>
> java.lang.NullPointerException
> :114, RelMetadataQuery (org.apache.calcite.rel.metadata)
> :76, RelMetadataQuery (org.apache.calcite.rel.metadata)
> get:39, FlinkRelOptClusterFactory$$anon$1
> (org.apache.flink.table.planner.calcite)
> get:38, FlinkRelOptClusterFactory$$anon$1
> (org.apache.flink.table.planner.calcite)
> getMetadataQuery:178, RelOptCluster (org.apache.calcite.plan)
> create:108, LogicalFilter (org.apache.calcite.rel.logical)
> createFilter:344, RelFactories$FilterFactoryImpl
> (org.apache.calcite.rel.core)
> convertWhere:1042, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertSelectImpl:666, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertSelect:644, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertQueryRecursive:3438, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertQuery:570, SqlToRelConverter (org.apache.calcite.sql2rel)
> org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:165,
> FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
> rel:157, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
> toQueryOperation:823, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> convertSqlQuery:795, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> convert:250, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> parse:78, ParserImpl (org.apache.flink.table.planner.delegation)
> sqlQuery:639, TableEnvironmentImpl (org.apache.flink.table.api.internal)
> $anonfun$translateTemplate$2:476, Foo$ (Foo)
> apply:-1, 644680650 (ai.hunters.pipeline.Processors$$$Lambda$1597)
> evaluateNow:361, FiberContext (zio.internal)
> $anonfun$evaluateLater$1:778, FiberContext (zio.internal)
> run:-1, 289594359 (zio.internal.FiberContext$$Lambda$617)
> runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
> run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
> run:748, Thread (java.lang)
>
> This seems to be coming from the FlinkRelMetadataQuery class attempting to
> initialize all handlers:
>
> [image: image.png]
>
> This seems to be coming from the calcite shaded JAR
> inside "flink-table-planner-blink-1.12"
>
> Has anyone ran into this issue? I saw a thread in the chinese user group
> but I don't understand what's been said there (
> https://www.mail-archive.com/user-zh@flink.apache.org/msg05874.html)
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

2020-12-10 Thread Danny Chan
One thing needs to note is that the old connectors are still in the
release-1.11/release-1.12 jars. So the old option still works but with the
old connector codes.

You may need to find the root cause why the new options do not work, maybe
some stack trace here ?

abelm  于2020年12月10日周四 下午10:54写道:

> Hi! Thank you for the reply!
>
> I understand that the metadata syntax is only available as of 1.12, but I
> am
> indeed trying to use Flink 1.12.
>
> Regarding the option for ignoring parse errors: I have already noticed from
> before that, according to the docs, even in Flink 1.11 (which is the
> version
> that the project was running on before),  the option should be
> 'json.ignore-parse-errors', but for some strange reason, to set fail on
> missing field to false, I also only seemed to get it to work with
> 'format.fail-on-missing-field', instead of the 'json.fail-on-missing-field'
> option stated in the docs.
>
> For that reason, I feel like I might be doing something wrong in terms of
> dependencies, given that so many of my connector/format options seem to
> require old syntax instead of the newer one from Flink 1.11 and 1.12 to
> run.
> As a further example, you might notice in my original message that I'm also
> using 'connector.type' = 'kafka' instead of 'connector' = 'kafka', because
> for some reason that's the only version that worked.
>
> Genuinely not sure how I managed to create such a problem, so any further
> help would be appreciated.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink 1.12, Kafka Connector, JSON format - "Could not find a suitable table factory"

2020-12-09 Thread Danny Chan
Hi, abelm ~

Which version Flink did you use? We did some refactoring for the connector
options since Flink 1.11. The METADATA syntax is only supported since
version 1.12.

In 1.11, to ignore the parse errors, you need to use option
"json.ignore-parse-error" [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#json-ignore-parse-errors

abelm  于2020年12月10日周四 上午1:13写道:

> Hello! I have a Scala 2.12 project which registers some tables (that get
> their data from Kafka in JSON form) to the StreamTableEnvironment via the
> executeSql command before calling execute on the
> StreamExecutionEnvironment.
>
> Everything behaves as expected until I either try to set
> /'format.ignore-parse-errors' = 'true'/ in the connector options, or I try
> to add the Kafka record timestamp as a table field via /`ts` TIMESTAMP(3)
> METADATA FROM 'timestamp'/. In both of these case I get:
>
> *Exception in thread "main" org.apache.flink.table.api.TableException:
> findAndCreateTableSource failed.*
> *Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
>
> Reason: No factory supports all properties.
> *
>
> Additionally, for ignoring parsing errors:
> *The matching candidates:
> org.apache.flink.formats.json.JsonRowFormatFactory
> Unsupported property keys:
> format.ignore-parse-errors*
>
> While, for the timestamp field:
> *The matching candidates:
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> Unsupported property keys:
> schema.#.metadata
> schema.#.virtual*
>
> Here is the DDL code used for table creation:
> /
> "CREATE TEMPORARY TABLE `" + name + "` (" + tableFields + ") " +
>   "WITH (" +
>   "'connector.type' = 'kafka', " +
>   "'connector.version' = 'universal', " +
>   "'connector.topic' = '" + name + "', " +
>   "'connector.properties.bootstrap.servers' = '" + kafkaAddr + "', " +
>   "'connector.startup-mode' = '" +
>   (if (checkLatest) "latest-offset" else "earliest-offset") +
>   "', " +
>   "'connector.properties.default.api.timeout.ms' = '5000', " +
>   "'format.type' = 'json', " +
>   "'format.fail-on-missing-field' = 'false'" +
>   ")"
> /
>
> And here is the Flink-related config from build.sbt:
> /
> lazy val flinkVersion   = "1.12.0"
> libraryDependencies ++= Seq(
>   "org.apache.flink"  %% "flink-scala"%
> flinkVersion,
>   "org.apache.flink"  %% "flink-streaming-scala"  %
> flinkVersion,
>   "org.apache.flink"  %% "flink-connector-kafka"  %
> flinkVersion,
>   "org.apache.flink"  %% "flink-clients"  %
> flinkVersion,
>   "org.apache.flink"  %% "flink-table-api-scala-bridge"   %
> flinkVersion,
>   "org.apache.flink"  %% "flink-table-planner-blink"  %
> flinkVersion,
>   "org.apache.flink"   % "flink-json" %
> flinkVersion,
>   "org.apache.flink"  %% "flink-test-utils"   %
> flinkVersion  % Test,
>   "org.apache.flink"  %% "flink-runtime"  %
> flinkVersion  % Test classifier "tests",
>   "org.apache.flink"  %% "flink-streaming-java"   %
> flinkVersion  % Test classifier "tests",
> )
> /
>
> I would appreciate any tips on getting both the timestamp and the error
> parse setting to work. Thank you in advance!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

2020-12-09 Thread Danny Chan
No, the group agg, stream-stream join and rank are all stateful
operators which need a state-backend to bookkeep the acc values.

But it is only required to emit the retractions when the stateful operator
A has a downstream operator B that is also stateful, because the B needs
the retractions to correct the accs. If B is not stateful, just emitting
the new record to override is enough.

You just need to correct the acc state to what it expects to be (say
re-evaluate the acc without the record that needs retraction) when you
received  the retraction message.

Rex Fenley  于2020年12月10日周四 上午2:44写道:

> So from what I'm understanding, the aggregate itself is not a "stateful
> operator" but one may follow it? How does the aggregate accumulator keep
> old values then? It can't all just live in memory, actually, looking at the
> savepoints it looks like there's state associated with our aggregate
> operator.
>
> To clarify my concern too, in my retract function impl in the aggregate
> function class, all I do is remove a value (a group id) from the
> accumulator set (which is an array). For example, if there is only 1
> group_id left for a user and it gets deleted, that group_id will be removed
> from the accumulator set and the set will be empty. I would hope that at
> that point, given that there are no remaining rows for the aggregate, that
> I could or flink will just delete the associated stored accumulator
> altogether i.e. delete `user_id_1 -> []`. Is it possible that both the
> groups and the user need to be deleted for everything to clear from
> storage? That might make more sense actually..
>
> If this doesn't happen, since users delete themselves and their groups all
> the time, we'll be storing all these empty data sets in rocks for no
> reason. To clarify, we're using Debezium as our source and using Flink as a
> materialization engine, so we never want to explicitly set a timeout on any
> of our data, we just want to scale up predictably with our user growth.
>
> Thanks!
>
> On Wed, Dec 9, 2020 at 4:14 AM Danny Chan  wrote:
>
>> Hi, Rex Fenley ~
>>
>> If there is stateful operator as the output of the aggregate function.
>> Then each time the function receives an update (or delete) for the key, the
>> agg operator would emit 2 messages, one for retracting the old record, one
>> for the new message. For your case, the new message is the DELETE.
>>
>> If there is no stateful operator, the aggregate operator would just emit
>> the update after (the new) message which is the delete.
>>
>> Rex Fenley  于2020年12月9日周三 上午4:30写道:
>>
>>> Hello,
>>>
>>> I'd like to better understand delete behavior of AggregateFunctions.
>>> Let's assume there's an aggregate of `user_id` to a set of `group_ids` for
>>> groups belonging to that user.
>>> `user_id_1 -> [group_id_1, group_id_2, etc.]`
>>> Now let's assume sometime later that deletes arrive for all rows which
>>> produce user_id_1's group_id's.
>>>
>>> Would the aggregate function completely delete the associated state from
>>> RocksDB or would it leave something like `user_id_1 -> []` sitting in
>>> RocksDB forever?
>>>
>>> We have an aggregate similar to this where users could delete themselves
>>> and we want to make sure we're not accumulating data forever for those
>>> users.
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


Re: what's meaning of the "true/false" from "groupy...select"?THANKS

2020-12-09 Thread Danny Chan
The "true" means the message is an insert/update after, the "false" means
the message is a retraction (for the old record that needs to be modified).

Appleyuchi  于2020年12月9日周三 下午12:20写道:

>
> The complete code is:
> https://paste.ubuntu.com/p/hpWB87kT6P/
>
> The result is:
> 2> (true,1,diaper,4)
> 7> (true,3,rubber,2)
> 4> (true,1,beer,3)
> 7> (false,3,rubber,2)
> 7> (true,3,rubber,8)
>
> That's the meaning of  true/false in the result
> after running the above code?
>
> Thanks for your help~!
>
>
>
>


Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

2020-12-09 Thread Danny Chan
Hi, Rex Fenley ~

If there is stateful operator as the output of the aggregate function. Then
each time the function receives an update (or delete) for the key, the agg
operator would emit 2 messages, one for retracting the old record, one for
the new message. For your case, the new message is the DELETE.

If there is no stateful operator, the aggregate operator would just emit
the update after (the new) message which is the delete.

Rex Fenley  于2020年12月9日周三 上午4:30写道:

> Hello,
>
> I'd like to better understand delete behavior of AggregateFunctions. Let's
> assume there's an aggregate of `user_id` to a set of `group_ids` for groups
> belonging to that user.
> `user_id_1 -> [group_id_1, group_id_2, etc.]`
> Now let's assume sometime later that deletes arrive for all rows which
> produce user_id_1's group_id's.
>
> Would the aggregate function completely delete the associated state from
> RocksDB or would it leave something like `user_id_1 -> []` sitting in
> RocksDB forever?
>
> We have an aggregate similar to this where users could delete themselves
> and we want to make sure we're not accumulating data forever for those
> users.
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: lookup cache clarification

2020-12-09 Thread Danny Chan
Yes, you understand it correctly.

Marco Villalobos  于2020年12月9日周三 上午4:23写道:

> I set up the following lookup cache values:
>
> 'lookup.cache.max-rows' = '20'
> 'lookup.cache.ttl' = '1min'
>
> for a jdbc connector.
>
> This table currently only has about 2 records in it. However,
> since I set the TTL to 1 minute, I expected the job to query that
> table every minute.
>
> The documentation states:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache
>
> The oldest rows in cache will be expired when the cache hit to the max
> cached rows lookup.cache.max-rows or when the row exceeds the max time
> to live lookup.cache.ttl.
>
> What am I misunderstanding?
>


Re: How can I optimize joins or cache misses in SQL api?

2020-12-09 Thread Danny Chan
Hi, Marco Villalobos ~

It's nice to see that you choose the SQL API which is more concise and
expressive.

To answer some of your questions:

> Q: Is there a way to control that? I don't want the N + 1 query problem.

No, the SQL evaluate row by row, there maybe some optimizations internal
that buffer the data first, but there is no logic to combine the ad-hoc
query into one IN.

> Q: Is there a way to preload persons table, since it changes only
about once every two weeks and then do a LEFT JOIN on it?

Yes, the temporal table have a configuration to cache the data, but by
default, this feature is disabled. [1]

> Q: Is there a way to control a shorter cache time for non-existent items?

You can configure the state TTL of stream-stream join through [2] or modify
the temporal cache TTL through the options above.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

Marco Villalobos  于2020年12月8日周二 下午11:51写道:

> scenario:
>
> kafka stream enriched with tableS in postgresql
>
> Let's pretend that the postgres has an organizations, departments, and
> persons table, and we want to join the full name of the kafka table
> that has the person id.  I also want to determine if the person id is
> missing.
>
> This requires a left join.
>
> SELECT o.id, d.id, p.fullname, k.ssn, SUM(k.amount)
> FROM purchases k
> JOIN organizations o ON o.code = k.organization
> JOIN departmentS d ON d.code = k.department
> LEFT JOIN persons FOR SYSTEM_TIME AS OF k.procTime AS p ON
> p.department_id = d.id
> WHERE p.ssn = k.ssn
> GROUP BY
> TUMBLE(s.procTime, INTERVAL '3' MINUTE), o.id, d.id, p.fullname, k.ssn
>
> Let's say that the TTL for organizations and departments is 12 months,
> but for persons
> it is 1 month.
>
> observations:
>
> If six unique people enter the kafka topic, then that will issue six
> separate queries to the database of the form:
>
> SELECT id, ssn, fullname, dob FROM persons WHERE.deparment_id = $1 AND ssn
> = $2
>
> However, since this is a tumbling, it would be more efficient to do
> one query with six parameters in an IN clause.  Example:
>
> SELECT id, ssn, fullname, dob FROM persons WHERE.(deparment_id, ssn)
> IN (($1,$2), ($3,$4),($5,$6),($7,$8),($9,$10)($11,$12))
>
> Q: Is there a way to control that? I don't want the N + 1 query problem.
>
> Q: Are these queries performed asynchronously?  If there were 20
> unique persons, I would not want 2 synchronous queries.
>
> Q: Is there a way to preload persons table, since it changes only
> about once every two weeks and then do a LEFT JOIN on it?
>
> Let's say that the person does not exist. I am impressed that Flink
> caches that a person does not exist.  However, I want to cache if a
> person exists for a month, but if the person does not exist, I only
> want to remember that for a day.
>
> Q: Is there a way to control a shorter cache time for non-existent items?
>
> I really like the expressiveness and succinctness of the SQL api in
> Flink, however, I am worried that I need use the data-stream API in
> order to control the scenarios above.
>
> I appreciate any advice, thank you.
>


Re: How to tell what mode a Table operator is in

2020-12-03 Thread Danny Chan
If a stateful operator has also a stateful operator in its input
sub-pipeline, then it may receive retract messages. Operator like group
agg, stream-stream join or rank are stateful.

We can not show if the operator are receiving retract messages in the UI.
But your request is reasonable.

Rex Fenley 于2020年12月4日 周五上午4:48写道:

> Our sinks are uniquely keyed as well. A couple of our joins are not until
> an aggregate is performed however.
>
> On Thu, Dec 3, 2020 at 12:46 PM Rex Fenley  wrote:
>
>> Hi,
>>
>> When I'm looking at the Flink plan in the UI and at an operator, is there
>> a way to tell if an operator is in Retract mode vs Upsert mode? Ideally we
>> want as many of our operators in Upsert mode as possible since our data
>> sources are all uniquely keyed.
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Performance consequence of leftOuterJoinLateral

2020-12-01 Thread Danny Chan
Hi, Rex ~

For "leftOuterJoinLateral" do you mean join a table function through
lateral table ?
If it is, yes, the complexity is O(1) for each probe key of LHS. The table
function evaluate the extra columns and append it to the left columns.

Rex Fenley  于2020年12月2日周三 上午7:54写道:

> Hello,
>
> I'm curious if there's any performance consequence of using a
> TableFunction + leftOuterJoinLateral to create some new columns vs creating
> each column individually?
>
> I'm hoping that lookup for a row with leftOuterJoinLateral is essentially
> O(1), so as soon as the TableFunction is done it just appends the new
> column values to the row.
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Filter Null in Array in SQL Connector

2020-12-01 Thread Danny Chan
 "fields": [
>>>>>   { "type": "int32", "optional": false, "field": "id" },
>>>>>   {
>>>>> "type": "array",
>>>>> "items": { "type": "string", "optional": true },
>>>>> "optional": false,
>>>>> "field": "roles"
>>>>>   },
>>>>> ],
>>>>> "optional": true,
>>>>> "name": "db.public.data.Value",
>>>>> "field": "after"
>>>>>   },
>>>>>   {
>>>>> "type": "struct",
>>>>> "fields": [
>>>>>   { "type": "string", "optional": false, "field": "version" },
>>>>>   { "type": "string", "optional": false, "field": "connector"
>>>>> },
>>>>>   { "type": "string", "optional": false, "field": "name" },
>>>>>   { "type": "int64", "optional": false, "field": "ts_ms" },
>>>>>   {
>>>>> "type": "string",
>>>>> "optional": true,
>>>>> "name": "io.debezium.data.Enum",
>>>>> "version": 1,
>>>>> "parameters": { "allowed": "true,last,false" },
>>>>> "default": "false",
>>>>> "field": "snapshot"
>>>>>   },
>>>>>   { "type": "string", "optional": false, "field": "db" },
>>>>>   { "type": "string", "optional": false, "field": "schema" },
>>>>>   { "type": "string", "optional": false, "field": "table" },
>>>>>   { "type": "int64", "optional": true, "field": "txId" },
>>>>>   { "type": "int64", "optional": true, "field": "lsn" },
>>>>>   { "type": "int64", "optional": true, "field": "xmin" }
>>>>> ],
>>>>> "optional": false,
>>>>> "name": "io.debezium.connector.postgresql.Source",
>>>>> "field": "source"
>>>>>   },
>>>>>   { "type": "string", "optional": false, "field": "op" },
>>>>>   { "type": "int64", "optional": true, "field": "ts_ms" },
>>>>>   {
>>>>> "type": "struct",
>>>>> "fields": [
>>>>>   { "type": "string", "optional": false, "field": "id" },
>>>>>   { "type": "int64", "optional": false, "field": "total_order"
>>>>> },
>>>>>   {
>>>>> "type": "int64",
>>>>> "optional": false,
>>>>> "field": "data_collection_order"
>>>>>   }
>>>>> ],
>>>>> "optional": true,
>>>>> "field": "transaction"
>>>>>   }
>>>>> ],
>>>>> "optional": false,
>>>>> "name": "db.public.data.Envelope"
>>>>>   },
>>>>>   "payload": {
>>>>> "before": null,
>>>>> "after": {
>>>>>   "id": 76704,
>>>>>   "roles": [null],
>>>>> },
>>>>> "source": {
>>>>>   "ver

Re: Is there a way we can specify operator ID for DDLs?

2020-11-26 Thread Danny Chan
Here is the issue https://issues.apache.org/jira/browse/FLINK-20368

Kevin Kwon  于2020年11月26日周四 上午8:50写道:

> thanks alot :)
>
> On Wed, Nov 25, 2020 at 3:26 PM Danny Chan  wrote:
>
>> SQL does not support that now. But i think your request is reasonable.
>> AFAIK . SQL hints may be a way to configure such a per-operator thing.
>> Would fire an issue first to see if we have an solution for the midterm.
>>
>> Kevin Kwon 于2020年11月25日 周三下午5:06写道:
>>
>>> I just want the source and sink operator compatibility. I use Kafka as
>>> source and I'd want to save the offsets through checkpoint
>>>
>>> I know how to do with DataStream API but not with plain SQL DDL
>>>
>>> On Wed, Nov 25, 2020, 3:09 AM Danny Chan  wrote:
>>>
>>>> Hi Kevin Kwon ~
>>>>
>>>> Do you want to customize only the source operator name or all the
>>>> operator name in order for the state compatibility ?
>>>>
>>>> State compatibility is an orthogonal topic and keep the operator name
>>>> is one way to solve it.
>>>>
>>>> Kevin Kwon  于2020年11月25日周三 上午1:11写道:
>>>>
>>>>> For SQLs, I know that the operator ID assignment is not possible now
>>>>> since the query optimizer may not be backward compatible in each release
>>>>>
>>>>> But are DDLs also affected by this?
>>>>>
>>>>> for example,
>>>>>
>>>>> CREATE TABLE mytable (
>>>>>   id BIGINT,
>>>>>   data STRING
>>>>> ) with (
>>>>>   connector = 'kafka'
>>>>>   ...
>>>>>   id = 'mytable'
>>>>>   name = 'mytable'
>>>>> )
>>>>>
>>>>> and we can save all related checkpoint data
>>>>>
>>>>


Re: Is there a way we can specify operator ID for DDLs?

2020-11-25 Thread Danny Chan
SQL does not support that now. But i think your request is reasonable.
AFAIK . SQL hints may be a way to configure such a per-operator thing.
Would fire an issue first to see if we have an solution for the midterm.

Kevin Kwon 于2020年11月25日 周三下午5:06写道:

> I just want the source and sink operator compatibility. I use Kafka as
> source and I'd want to save the offsets through checkpoint
>
> I know how to do with DataStream API but not with plain SQL DDL
>
> On Wed, Nov 25, 2020, 3:09 AM Danny Chan  wrote:
>
>> Hi Kevin Kwon ~
>>
>> Do you want to customize only the source operator name or all the
>> operator name in order for the state compatibility ?
>>
>> State compatibility is an orthogonal topic and keep the operator name is
>> one way to solve it.
>>
>> Kevin Kwon  于2020年11月25日周三 上午1:11写道:
>>
>>> For SQLs, I know that the operator ID assignment is not possible now
>>> since the query optimizer may not be backward compatible in each release
>>>
>>> But are DDLs also affected by this?
>>>
>>> for example,
>>>
>>> CREATE TABLE mytable (
>>>   id BIGINT,
>>>   data STRING
>>> ) with (
>>>   connector = 'kafka'
>>>   ...
>>>   id = 'mytable'
>>>   name = 'mytable'
>>> )
>>>
>>> and we can save all related checkpoint data
>>>
>>


Re: Flink 1.11 avro format question

2020-11-25 Thread Danny Chan
For your question 1. This does not work as expected. I would check it soon
to see if it is a bug and fire a fix.

Hongjian Peng 于2020年11月25日 周三下午4:45写道:

> In Flink 1.10, we can pass this schema with 'format.avro-schema' property
> to SQL DDL, but in Flink 1.11, the Avro schema is always derived from
> the table schema.
> We have two questions about the Flink 1.11 Avro format:
>
> 1. Flink 1.11 maps nullable types to Avro union(something, null). How to
> map the nullable types to union(null, something)? In our schema definition,
> we follow the Avro recommended definition, list 'null' as the first type.
> With Flink default mapping, the AvroRowDataDeserializationSchema will get
> the wrong/error result when lists 'null' as the first type of union.
>
> 2. Now if we want to set the field in a Row to non-nullable, we have to
> set the Row to non-nullable firstly. Is it the expected behavior of
> the Avro format?
> For example,
> event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT NULL,
> eventId VARCHAR> will be maped to Avro schema:
>
> {
>   "type": "record",
>   "name": "event",
>   "fields": [
> {
>   "name": "header",
>   "type": {
> "type": "record",
> "name": "header",
> "fields": [
>   {
> "name": "createTimestamp",
> "type": ["long",null]
>   },
>   {
> "name": "sentTimestamp",
>  "type": ["long",null]
>   },
>   {
> "name": "eventId",
> "type": [
>   "null",
>   {
> "type": "string",
> "avro.java.string": "String"
>   }
> ]
>   }
> ]
>   }
> }
> ]
> }
>
> We have to use "event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp
> BIGINT NOT NULL, eventId VARCHAR> NOT NULL" to make the NOT NULL of
> fields: createTimestamp and sentTimestamp works.
>
>
>
>
>
> At 2020-11-25 16:12:58, "Hongjian Peng"  wrote:
>
> Hi Flink Community,
>
> We are trying to upgrade our Flink SQL job from 1.10 to 1.11. We used
> Kafka source table, and the data is stored in Kafka in Avro format.
> Schema is like this:
>
> {
>   "type": "record",
>   "name": "event",
>   "namespace": "busseniss.event",
>   "fields": [
> {
>   "name": "header",
>   "type": {
> "type": "record",
> "name": "header",
> "fields": [
>   {
> "name": "createTimestamp",
> "type": "long"
>   },
>   {
> "name": "sentTimestamp",
> "type": "long"
>   },
>   {
> "name": "eventId",
> "type": [
>   "null",
>   {
> "type": "string",
> "avro.java.string": "String"
>   }
> ]
>   }
> ]
>   },
>   "doc": "Rheos header "
> },
> {
>   "name": "si",
>   "type": [
> "null",
> "string"
>   ]
> }
> ]
> }
>
>
>
>
>
>
> --
> Hongjian Peng
> Department of Computer Science and Engineering
> Shanghai Jiao Tong University
> Email: super...@163.com
>
>


Re: Flink 1.11 avro format question

2020-11-25 Thread Danny Chan
Hi Hongjian Peng ~

For your question 1, it is not work as expected. If it is true, there is
definitely a bug. I would check and fix it later.

For your question 2, yes. This is an intent design. There is a routine in
the type inference: all the fields of a nullable struct type should also be
nullable.

Hongjian Peng 于2020年11月25日 周三下午4:45写道:

> In Flink 1.10, we can pass this schema with 'format.avro-schema' property
> to SQL DDL, but in Flink 1.11, the Avro schema is always derived from
> the table schema.
> We have two questions about the Flink 1.11 Avro format:
>
> 1. Flink 1.11 maps nullable types to Avro union(something, null). How to
> map the nullable types to union(null, something)? In our schema definition,
> we follow the Avro recommended definition, list 'null' as the first type.
> With Flink default mapping, the AvroRowDataDeserializationSchema will get
> the wrong/error result when lists 'null' as the first type of union.
>
> 2. Now if we want to set the field in a Row to non-nullable, we have to
> set the Row to non-nullable firstly. Is it the expected behavior of
> the Avro format?
> For example,
> event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT NULL,
> eventId VARCHAR> will be maped to Avro schema:
>
> {
>   "type": "record",
>   "name": "event",
>   "fields": [
> {
>   "name": "header",
>   "type": {
> "type": "record",
> "name": "header",
> "fields": [
>   {
> "name": "createTimestamp",
> "type": ["long",null]
>   },
>   {
> "name": "sentTimestamp",
>  "type": ["long",null]
>   },
>   {
> "name": "eventId",
> "type": [
>   "null",
>   {
> "type": "string",
> "avro.java.string": "String"
>   }
> ]
>   }
> ]
>   }
> }
> ]
> }
>
> We have to use "event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp
> BIGINT NOT NULL, eventId VARCHAR> NOT NULL" to make the NOT NULL of
> fields: createTimestamp and sentTimestamp works.
>
>
>
>
>
> At 2020-11-25 16:12:58, "Hongjian Peng"  wrote:
>
> Hi Flink Community,
>
> We are trying to upgrade our Flink SQL job from 1.10 to 1.11. We used
> Kafka source table, and the data is stored in Kafka in Avro format.
> Schema is like this:
>
> {
>   "type": "record",
>   "name": "event",
>   "namespace": "busseniss.event",
>   "fields": [
> {
>   "name": "header",
>   "type": {
> "type": "record",
> "name": "header",
> "fields": [
>   {
> "name": "createTimestamp",
> "type": "long"
>   },
>   {
> "name": "sentTimestamp",
> "type": "long"
>   },
>   {
> "name": "eventId",
> "type": [
>   "null",
>   {
> "type": "string",
> "avro.java.string": "String"
>   }
> ]
>   }
> ]
>   },
>   "doc": "Rheos header "
> },
> {
>   "name": "si",
>   "type": [
> "null",
> "string"
>   ]
> }
> ]
> }
>
>
>
>
>
>
> --
> Hongjian Peng
> Department of Computer Science and Engineering
> Shanghai Jiao Tong University
> Email: super...@163.com
>
>


Re: Is there a way we can specify operator ID for DDLs?

2020-11-24 Thread Danny Chan
Hi Kevin Kwon ~

Do you want to customize only the source operator name or all the operator
name in order for the state compatibility ?

State compatibility is an orthogonal topic and keep the operator name is
one way to solve it.

Kevin Kwon  于2020年11月25日周三 上午1:11写道:

> For SQLs, I know that the operator ID assignment is not possible now since
> the query optimizer may not be backward compatible in each release
>
> But are DDLs also affected by this?
>
> for example,
>
> CREATE TABLE mytable (
>   id BIGINT,
>   data STRING
> ) with (
>   connector = 'kafka'
>   ...
>   id = 'mytable'
>   name = 'mytable'
> )
>
> and we can save all related checkpoint data
>


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
quot;
   },
   {
  "type":"int64",
  "optional":false,
  "field":"total_order"
   },
   {
  "type":"int64",
  "optional":false,
  "field":"data_collection_order"
   }
],
"optional":true,
"field":"transaction"
 }
  ],
  "optional":false,
  "name":"db.public.data.Envelope"
   },
   "payload":{
  "before":null,
  "after":{
 "id":76704,
 "roles":[
null
 ]
  },
  "source":{
 "version":"1.3.0.Final",
 "connector":"postgresql",
 "name":"db",
 "ts_ms":1605739197360,
 "snapshot":"true",
 "db":"db",
 "schema":"public",
 "table":"data",
 "txId":1784,
 "lsn":1305806608,
 "xmin":null
  },
  "op":"r",
  "ts_ms":1605739197373,
  "transaction":null
   }
}

Which works correctly. I reformatted it because it is with invalid JSON
format.

Rex Fenley  于2020年11月20日周五 上午3:02写道:

> Below is a highly redacted set of data that should represent the problem.
> As you can see, the "roles" field has "[null]" in it, a null value within
> the array. We also see in our DB corresponding rows like the following.
> id | roles
> ---+
>   16867433 | {NULL}
>
> We have confirmed that by not selecting "roles" all data passes through
> without failure on a single operator, but selecting "roles" will eventually
> always fail with java.lang.NullPointerException repeatedly. What is odd
> about this is there is 0 additional stack trace, just the exception, in our
> logs and in Flink UI. We only have INFO logging on, however, other
> exceptions we've encountered in our development have always revealed a
> stack trace.
>
> {
>   "schema": {
> "type": "struct",
> "fields": [
>   {
> "type": "struct",
> "fields": [
>   { "type": "int32", "optional": false, "field": "id" },
>   {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
>   },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "before"
>   },
>   {
> "type": "struct",
> "fields": [
>   { "type": "int32", "optional": false, "field": "id" },
>   {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
>   },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "after"
>   },
>   {
> "type": "struct",
> "fields": [
>   { "type": "string", "optional": false, "field": "version" },
>   { "type": "string", "optional": false, "field": "connector" },
>   { "type": "string", "optional": false, "field": "name" },
>   { "type": "int64", "optional": false, "field": "ts_ms" },
>   {
> "type": "string",
> "optional": true,
> "name": "io.debezium.data.Enum",
> "version": 1,
> "parameters": { "allowed": "true,last,false" },
> "default": "false",
> "field": "snapshot"
>   },
&

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
Can you also share your problematic json string here ? So that we can
decide the specific error case cause.

Rex Fenley  于2020年11月19日周四 下午2:51写道:

> Hi,
>
> I recently discovered some of our data has NULL values arriving in an
> ARRAY column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions
> for when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
>
> Is there any way to not throw or to possibly filter out NULLs in an Array
> of Strings in Flink?
>
> We're somewhat stuck on how to solve this problem, we'd like to be
> defensive about this on Flink's side.
>
> Thanks!
>
> (P.S. The exception was not that informative, there may be room for
> improvement in terms of a richer error message when this happens.)
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
Hi, Fenley ~

You are right, parsing nulls of ARRAY field is not supported now, i have
logged an issue [1] and would fix it soon ~

[1] https://issues.apache.org/jira/browse/FLINK-20234

Rex Fenley  于2020年11月19日周四 下午2:51写道:

> Hi,
>
> I recently discovered some of our data has NULL values arriving in an
> ARRAY column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions
> for when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
>
> Is there any way to not throw or to possibly filter out NULLs in an Array
> of Strings in Flink?
>
> We're somewhat stuck on how to solve this problem, we'd like to be
> defensive about this on Flink's side.
>
> Thanks!
>
> (P.S. The exception was not that informative, there may be room for
> improvement in terms of a richer error message when this happens.)
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-18 Thread Danny Chan
Yes, this is a bug which is fixed recently [1] for release 1.12 and 1.11.3

You can also switch to the source table catalog first before you execute
the CREATE TABLE LIKE DDL just like Ingo suggested.

[1] https://issues.apache.org/jira/browse/FLINK-19281

김동원  于2020年11月17日周二 上午12:19写道:

> Hi Ingo,
>
> Thank you for letting me know! I didn’t know that’s already discussed.
>
> Best,
>
> Dongwon
>
> 2020. 11. 17. 오전 1:12, Ingo Bürk  작성:
>
> 
> Hi,
>
> I ran into the same issue today. This is fixed in 1.11.3, the
> corresponding bug was FLINK-19281.
>
> A workaround is to switch the current catalog and database temporarily to
> hive.navi and then not qualify the table name in the LIKE clause.
>
>
> Regards
> Ingo
>
>
> On Mon, Nov 16, 2020, 17:04 Dongwon Kim  wrote:
>
>> Hi Danny~
>> Sorry for late reply,
>>
>> Let's take a look at a running example:
>>
>>> EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>   .inBatchMode()
>>>   .build();
>>>
>>> TableEnvironment tEnv = TableEnvironment.create(settings);
>>>
>>> HiveCatalog hiveCatalog = new HiveCatalog("hive",null, args[1]);
>>> tEnv.registerCatalog("hive", hiveCatalog);
>>>
>>> GenericInMemoryCatalog inmemCatalog = new
>>> GenericInMemoryCatalog("inmem");
>>> tEnv.registerCatalog("inmem", inmemCatalog);
>>> tEnv.useCatalog("inmem");
>>>
>>> TableResult result = tEnv.executeSql(
>>>   "CREATE TABLE copied LIKE hive.navi.gps"
>>> );
>>>
>>
>> I've got the following log messages:
>>
>>> 00:50:22,157 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>>>[] - Setting hive conf dir as /Users/eastcirclek/hive-conf
>>> 00:50:22,503 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>>>[] - Created HiveCatalog 'hive'
>>> 00:50:22,515 INFO  hive.metastore
>>> [] - Trying to connect to metastore with URI thrift://...:9083
>>> 00:50:22,678 INFO  hive.metastore
>>> [] - Connected to metastore.
>>> 00:50:22,678 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>>>[] - Connected to Hive metastore
>>> 00:50:22,799 INFO  org.apache.flink.table.catalog.CatalogManager
>>>[] - Set the current default catalog as [inmem] and the current
>>> default database as [default].
>>> *Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: Source table
>>> '`inmem`.`default`.`hive.navi.gps`' of the LIKE clause not found in the
>>> catalog, at line 1, column 26*
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:147)
>>> at java.util.Optional.orElseThrow(Optional.java:290)
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:147)
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:96)
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>>> at Test.main(Test.java:53)
>>>
>>
>> It seems like hive.navi.gps is recognized as a table name as a whole.
>> I currently declare such a table by specifying all fields without the
>> LIKE clause.
>>
>> Do I miss something?
>>
>> FYI, I'm working with Flink-1.11.2.
>>
>> Thank you~
>>
>> Best,
>>
>> Dongwon
>>
>>
>> On Fri, Nov 13, 2020 at 5:19 PM Danny Chan  wrote:
>>
>>> Hi Dongwon ~
>>>
>>> Table from different catalog/db is supported, you need to specify the
>>> full path of the source table:
>>>
>>> CREATE TABLE Orders_with_watermark (
>>> *...*) WITH (
>>> *...*)LIKE my_catalog.my_db.Orders;
>>>
>>>
>>> Dongwon Kim  于2020年11月11日周三 下午2:53写道:
>>>
>>>> Hi,
>>>>
>>>> Is it disallowed to refer to a table from different databases or
>>>> catalogs when someone creates a table?
>>>>
>>>> According to [1], there's no way to refer to tables belonging to
>>>> different databases or catalogs.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>>
>>>


Re: Table SQL Filesystem CSV recursive directory traversal

2020-11-13 Thread Danny Chan
In the current master code base, all the FileInputFormat default add the
files recursively with the given paths. (e.g. the #addFilesInDir method).

So it should be supported as default for SQL.

Timo Walther  于2020年11月9日周一 下午11:25写道:

> Hi Ruben,
>
> by looking at the code, it seems you should be able to do that. At least
> for batch workloads we are using
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory.CsvInputFormat
> which is a FileInputFormat that supports the mentioned configuration
> option.
>
> The problem is that this might not have been exposed via SQL properties
> yet. So you would need to write your own property-to-InputFormat factory
> that does it similar to:
>
>
> https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileSystemFormatFactory.java
>
> What you could do create your own factory and extend from the above so
> you can set additional properties. Not a nice solution but a workaround
> for now.
>
> More information to how to write your own factory can also be found here:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>
> I hope this helps.
>
> Regards,
> Timo
>
> On 09.11.20 09:27, Ruben Laguna wrote:
> > Is it possible?
> >
> > For Dataset I've found [1] :
> >
> > |parameters.setBoolean("recursive.file.enumeration", true); // pass the
> > configuration to the data source DataSet logs =
> > env.readTextFile("file:///path/with.nested/files")
> > .withParameters(parameters);|
> >
> >
> > But can I achieve something similar with the Table SQL?
> >
> > I have the following directory structure
> > /myfiles/20201010/00/00restoffilename1.csv
> > /myfiles/20201010/00/00restoffilename2.csv
> > ...
> > /myfiles/20201010/00/00restoffilename3000.csv
> > /myfiles/20201010/01/01restoffilename1.csv
> > 
> > /myfiles/20201010/00/00restoffilename3000.csv
> >
> > So for each day I have 255  subdirectories from 00 to  FF and each of
> > those directories can have 1000-3000 files and I would like to load all
> > those files in one go.
> >
> > [1]:
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#recursive-traversal-of-the-input-path-directory
> > <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#recursive-traversal-of-the-input-path-directory
> >
> >
> > --
> > /Rubén
>
>


Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-13 Thread Danny Chan
Hi Dongwon ~

Table from different catalog/db is supported, you need to specify the full
path of the source table:

CREATE TABLE Orders_with_watermark (
*...*) WITH (
*...*)LIKE my_catalog.my_db.Orders;


Dongwon Kim  于2020年11月11日周三 下午2:53写道:

> Hi,
>
> Is it disallowed to refer to a table from different databases or catalogs
> when someone creates a table?
>
> According to [1], there's no way to refer to tables belonging to different
> databases or catalogs.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>
> Best,
>
> Dongwon
>


Re: Failure to execute streaming SQL query

2020-11-06 Thread Danny Chan
Hi, Satyam ~

What version of Flink release did you use? I tested your first SQL
statements in local and they both works great.

Your second SQL statement fails because currently we does not support
stream-stream join on time attributes because the join would breaks the
semantic of time attribute (it does not keeps any order). But the error
stacktrace is indeed mis-leading, i have logged an issue here [1]

You may need to declare the watermark strategy in the DDL so that the
`timestamp` column can be recognized as a time attribute, like this:

CREATE TABLE T0 (
  amount BIGINT,
  ts TIMESTAMP(3),
  watermark for ts as ts - INTERVAL '5' SECOND
) WITH (
  ...
)

[1] https://issues.apache.org/jira/browse/FLINK-20017

Satyam Shekhar  于2020年11月6日周五 下午3:27写道:

> Hello,
>
> I have a table T0 with the following schema -
>
> root
>   |-- amount: BIGINT
>   |-- timestamp: TIMESTAMP(3)
>
> The following two queries fail execution on the above table when executed
> in streaming mode using the Blink planner.
>
> WITH A AS (
>   SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm
> FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
> select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
>
> WITH A AS (
>   SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE)
> as tm
> FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
> select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
>
> The two queries are very similar and only differ in their use of
> tumble_end and tumble_rowtime operator. Both queries use the
> timestamp column as their rowtime attribute. Casting "tm" column to
> timestamp makes both queries work -
>
> WITH A AS (
>   SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE)
> as TIMESTAMP(3)) as tm
> FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
> select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
>
> This workaround, however, loses the rowtime attribute from the output
> resultset for the second query.
>
> The first query fails with the following exception -
>
> java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to
> class java.lang.Long (java.sql.Timestamp is in module java.sql of loader
> 'platform'; java.lang.Long is in module java.base of loader 'bootstrap')
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SinkConversion$166.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at StreamExecCalc$163.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314)
> at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206)
> at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(Str

Re: Re: How to use both of SQL and DataStream in 1.11

2020-11-03 Thread Danny Chan
You can still use the .sqlQuery(...) to create a common table there, then
converts the table into a DataStream,

with this DataStream, you can add the multiple sink functions you like.

izual  于2020年11月2日周一 下午5:18写道:

> Hi, Danny:
>
>
> Thanks for your help.
>
>
> As in the question, some result was saved using DataStream API:
>
>
> ```
>
> table.toAppendStream[Row].addSink(new MyStreamSink)
>
>
> class MyStreamSink extends RichSinkFunction[Row] {
>
> override def invoke(r: Row): Unit = {
>
> // save result
>
> }
>
> }
>
> ```
>
>
> So if use `StatementSet.addInsert`, should I must use
> UpsertStreamTableSink and StreamTableSourceFactory to wrap the
> RichSinkFunction?
>
> Is there a way to keep using DataStream API in table environment? which is
> more expressive.
>
>
>
>
>
>
> At 2020-11-02 16:53:22, "Danny Chan"  wrote:
>
> You can still convert the datastream to table and register it with method
>
> void TableEnvironment.createTemporaryView(String path, Table view)
>
> Then create a StatementSet with
>
> StatementSet TableEnvironment.createStatementSet(),
>
> With the StatementSet, you can execute multiple insert statements
> altogether,
> and then submit the job with
>
> TableResult StatementSet.execute()
>
> izual  于2020年11月2日周一 上午11:28写道:
>
>> Hi, community:
>>
>>
>>   We used flink 1.9.1, both SQL and DataStream API to support multiple
>> sinks for product envs.
>>
>>   For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and
>> table.toAppendStream[Row].addSink(new RichSinkFunction[Row]
>> {...}).name("dest2"), and env.execute() to submit the DAG together, and
>> result will sink to dest1 or dest2 or both.
>>
>>
>>   Now I try to update flink to 1.11.2, according to [1]-Attention,use
>> tableEnv.execute() instead of env.execute(), but only get the result of
>> `sqlUpdate`, and result of `DataStream.addSink` is missed.
>>
>>
>>   1. How to get both the results in mixed SQL/DataStream use cases, maybe
>> change all RichSinkFunction into a UpsertTable works. Is there another
>> simple way to do this?
>>
>>   2. It seems like env.getExecutionPlan only returns DAG of DataStream
>> API now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1.
>>
>>
>>   Thanks for ur reply.
>>
>>
>>
>>
>
>
>
>


Re: How to use both of SQL and DataStream in 1.11

2020-11-02 Thread Danny Chan
You can still convert the datastream to table and register it with method

void TableEnvironment.createTemporaryView(String path, Table view)

Then create a StatementSet with

StatementSet TableEnvironment.createStatementSet(),

With the StatementSet, you can execute multiple insert statements
altogether,
and then submit the job with

TableResult StatementSet.execute()

izual  于2020年11月2日周一 上午11:28写道:

> Hi, community:
>
>
>   We used flink 1.9.1, both SQL and DataStream API to support multiple
> sinks for product envs.
>
>   For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and
> table.toAppendStream[Row].addSink(new RichSinkFunction[Row]
> {...}).name("dest2"), and env.execute() to submit the DAG together, and
> result will sink to dest1 or dest2 or both.
>
>
>   Now I try to update flink to 1.11.2, according to [1]-Attention,use
> tableEnv.execute() instead of env.execute(), but only get the result of
> `sqlUpdate`, and result of `DataStream.addSink` is missed.
>
>
>   1. How to get both the results in mixed SQL/DataStream use cases, maybe
> change all RichSinkFunction into a UpsertTable works. Is there another
> simple way to do this?
>
>   2. It seems like env.getExecutionPlan only returns DAG of DataStream API
> now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1.
>
>
>   Thanks for ur reply.
>
>
>
>


Re: quoted identifiers in Table SQL give SQL parse failed.

2020-10-29 Thread Danny Chan
Yes, Flink SQL use the back quote ` as the quote character, for your SQL,
it should be:

CREATE TABLE table1(`ts` TIMESTAMP) WITH(...)


Ruben Laguna  于2020年10月29日周四 下午6:32写道:

> I made this question on [Stackoverflow][1] but I'm cross posting here.
>
>
> Are double quoted identifiers allowed in Flink SQL? [Calcite
> documentation says to use double quoted
> identifiers](https://calcite.apache.org/docs/reference.html#identifiers)
> but they don't seem to work (see below). On the other hand I just
> found
>
> > Identifiers follow SQL requirements which means that they can be escaped
> with a backtick character (`).
>
> on the [Table API & SQL > Concepts & Common API > Expanding Table
> identifiers][2]
>
> So I guess this means that [Flink SQL][3] is not really just Calcite
> SQL plus extras, as I assumed.
>
> ---
>
> When I tried the following in Flink I got an `SQL parse failed .
> Encountered "\"" at line 1 ,column 21.` complaining about the `"`
> (double quote)
>
>CREATE TABLE table1("ts" TIMESTAMP) WITH(...)
>
> The full exception is
>
> org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "\"" at line 1, column 21.
> Was expecting one of:
> "CONSTRAINT" ...
> "PRIMARY" ...
> "UNIQUE" ...
> "WATERMARK" ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:658)
> at com.rubenlaguna.BatchJobTest.testBatchJob(BatchJobTest.java:22)
>  .
>
>
>
>
>
>
> [1]: https://stackoverflow.com/q/64588892/90580
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#expanding-table-identifiers
> [3]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/index.html
> --
> /Rubén
>


Re: Working with bounded Datastreams - Flink 1.11.1

2020-10-28 Thread Danny Chan
In SQL, you can use the over window to deduplicate the messages by the id
[1], but i'm not sure if there are same semantic operators in DataStream.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication

s_penakalap...@yahoo.com  于2020年10月28日周三
下午12:34写道:

> Hi All,
>
> Request your inputs please.
>
> Regards,
> Sunitha
>
> On Tuesday, October 27, 2020, 01:01:41 PM GMT+5:30,
> s_penakalap...@yahoo.com  wrote:
>
>
> Hi Team,
>
> I want to use Flink Datastream for Batch operations which involves huge
> data, I did try to calculate count and average on the whole Datastream with
> out using window function.
>
>  Approach I tried to calculate count on the datastream:
> 1> Read data from table (say past 2 days of data) as Datastream
> 2> apply Key operation on the datastream
> 3> then use reduce function to find count, sum and average.
>
> I have written output to file and also inserted into table: sample data
> from file is:
>
> vehicleId=aa, count=1, fuel=10, avgFuel=0.0
> vehicleId=dd, count=1, fuel=7, avgFuel=0.0
> vehicleId=dd, count=2, fuel=22, avgFuel=11.0
> vehicleId=dd, count=3, fuel=42, avgFuel=14.0
> vehicleId=ee, count=1, fuel=0, avgFuel=0.0
>
> what I am looking for is , when there are multiple records with same
> vehicle Id I see that only the final record is having correct values (like 
> vehicleId=dd).
> Is there any way to get only one final record for each vehicle as shown
> below:
> vehicleId=aa, count=1, fuel=10, avgFuel=0.0
> vehicleId=dd, count=3, fuel=42, avgFuel=14.0
> vehicleId=ee, count=1, fuel=0, avgFuel=0.0
>
> Also I request some help on how to sort whole DataStream based on one
> attribute. Say we have x records in one Batch Job I would like to sort and
> fetch X-2 position record per vehicle.
>
> Regards,
> Sunitha.
>
>


Re: How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-27 Thread Danny Chan
Our behavior also conflicts with the SQL standard, we should also mention
this in the document.

Till Rohrmann  于2020年10月27日周二 下午10:37写道:

> Thanks for the clarification. This improvement would be helpful, I believe.
>
> Cheers,
> Till
>
> On Tue, Oct 27, 2020 at 1:19 PM Jark Wu  wrote:
>
>> Hi Till,
>>
>> The documentation mentions that "this function is not deterministic"
>> where the "not deterministic" means the value of this function is not
>> deterministic for every record.
>> However, this is not very clear for users. I think we can improve the
>> documentation.
>>
>> Best,
>> Jark
>>
>> On Tue, 27 Oct 2020 at 15:59, Till Rohrmann  wrote:
>>
>>> Quick question Jark: Is this difference in behaviour documented? I
>>> couldn't find it in the docs.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Oct 27, 2020 at 7:30 AM Jark Wu  wrote:
>>>
 Hi Longdexin,

 In traditional batch sql, NOW() is executed and determined before the
 job is submitted and will not change for every processed record.
 However, this doesn't make much sense in streaming sql, therefore,
 NOW() function in Flink is executed for every record.

 Best,
 Jark

 On Fri, 23 Oct 2020 at 16:30, Till Rohrmann 
 wrote:

> Hi Longdexin,
>
> thanks for reaching out to the Flink community. I am pulling in Jark
> who might be able to help you with this question.
>
> Cheers,
> Till
>
> On Thu, Oct 22, 2020 at 2:56 PM Longdexin <274522...@qq.com> wrote:
>
>> From my point of view, the value of NOW() function in SQL is certain
>> by the
>> time when the streaming app is launched and will not change with the
>> process
>> time. However, as a new Flink user, I'm not so sure of that. By the
>> way, if
>> my attemp is to keep the time logic to update all the time, what
>> should I
>> do?
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: NullPointerException when trying to read null array in Postgres using JDBC Connector

2020-10-22 Thread Danny Chan
Yes, the current code throws directly for NULLs, can you log an issue there
?

Dylan Forciea  于2020年10月21日周三 上午4:30写道:

> I believe I am getting an error because I have a nullable postgres array
> of text that is set to NULL that I’m reading using the JDBC SQL Connector.
> Is this something that should be allowed? Looking at the source code line
> below, it doesn’t look like the case of an array being null would be
> handled.
>
>
>
> [error] Caused by: java.io.IOException: Couldn't access resultSet
>
> [error]   at
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:266)
>
> [error]   at
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:57)
>
> [error]   at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>
> [error]   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>
> [error]   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>
> [error]   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
> [error] Caused by: java.lang.NullPointerException
>
> [error]   at
> org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$c06ce9f4$2(PostgresRowConverter.java:97)
>
> [error]   at
> org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:79)
>
> [error]   at
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:259)
>
> [error]   ... 5 more
>
>
>
> Thanks,
>
> Dylan Forciea
>


Re: "stepless" sliding windows?

2020-10-22 Thread Danny Chan
The SLIDING window always triggers as of each step, what do you mean by
"stepless" ?

Alex Cruise  于2020年10月21日周三 上午1:52写道:

> whoops.. as usual, posting led me to find some answers myself. Does this
> make sense given my requirements?
>
> Thanks!
>
> private class MyWindowAssigner(val windowSize: Time) : WindowAssigner TimeWindow>() {
> private val trigger = CountTrigger.of(1) as Trigger TimeWindow>
>
> override fun assignWindows(
> element: Record,
> timestamp: Long,
> context: WindowAssignerContext
> ): MutableCollection {
> return mutableListOf(TimeWindow(timestamp - 
> windowSize.toMilliseconds(), timestamp))
> }
>
> override fun getDefaultTrigger(env: StreamExecutionEnvironment?): 
> Trigger {
> return trigger
> }
>
> override fun getWindowSerializer(executionConfig: ExecutionConfig?): 
> TypeSerializer {
> return TimeWindow.Serializer()
> }
>
> override fun isEventTime(): Boolean {
> return true
> }
> }
>
>
> On Tue, Oct 20, 2020 at 9:13 AM Alex Cruise  wrote:
>
>> Hey folks!
>>
>> I have an application that wants to use "stepless" sliding windows, i.e.
>> we produce aggregates on every event. The windows need to be of a fixed
>> size, but to have their start and end times update continuously, and I'd
>> like to trigger on every event. Is this a bad idea? I've googled and read
>> the docs extensively and haven't been able to identify built-in
>> functionality or examples that map cleanly to my requirements.
>>
>> OK, I just found DeltaTrigger, which looks promising... Does it make
>> sense to write a WindowAssigner that makes a new Window on every event,
>> allocation rates aside?
>>
>> Thanks!
>>
>> -0xe1a
>>
>


Re: Extract column and table lineage from flink sql

2020-10-22 Thread Danny Chan
Hi, dawangli ~

Usually people build the lineage of tables through a self-built platform,
there was a DB to persist the relationship between the tables, for each
job, you may need to analyze each SQL which are source tables and which are
sink.

E.G. The INSERT target table is a sink and table after the scan or join is
a source.

If you got the rel tree, you can get the info by a shuttle, if an AST
instead, you can have a SqlVisitor.

Danny Chan  于2020年10月22日周四 下午5:24写道:

> Hi, dawangli ~
>
> Usually people build the lineage of tables through a self-built platform,
> there was a DB to persist the relationship between the tables, for each
> job, you may need to analyze each SQL which are source tables and which are
> sink.
>
> E.G. The INSERT target table is a sink and table after the scan or join is
> a source.
>
> If you got the rel tree, you can get the info by a shuttle, if an AST
> instead, you can have a SqlVisitor.
>
> dawangli  于2020年10月20日周二 下午9:46写道:
>
>> I want to build a lineage system for a real-time data warehouse,how can I
>> extract table and column lineage from flink sql?
>>
>>
>>
>>
>>
>>
>>
>


Re: flink watermark strategy

2020-08-30 Thread Danny Chan
Watermark mainly serves for windows for the late arrive data, it actually 
reduces your performance.

Best,
Danny Chan
在 2020年8月29日 +0800 AM3:09,Vijayendra Yadav ,写道:
> Hi Team,
>
> For regular unbounded streaming application streaming through kafka, which 
> does not use any event time or window operations, does setting watermark 
> strategy for kafkaconsumer (connector) help us in any way like performance ?
>
> Regards,
> Vijay


user@flink.apache.org

2020-08-30 Thread Danny Chan
Thanks for the share ~

The query you gave is actually an interval join[1] , a windowed join is two 
windowed stream join together, see [2].

Theoretically, for interval join, the state would be cleaned periodically based 
on the watermark and allowed lateness when the range of RHS had been considered 
“late”.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html

Best,
Danny Chan
在 2020年8月29日 +0800 AM12:59,Sofya T. Irwin ,写道:
> Hi Danny,
>
> Thank you for your response.
> I'm trying to join two streams that are both fairly high volume. My join 
> looks like this:
>
>   SELECT
>     A.rowtime as rowtime,
>     A.foo,
>     B.bar
>   FROM A
>   LEFT JOIN B
>     ON A.foo = B.foo
>     AND A.rowtime BETWEEN B.rowtime - INTERVAL  '1' HOUR AND B.rowtime
>
> When I run this SQL, the state size metric looks like a sawtooth that 
> gradually keeps growing.
> Currently I disabled this query because of a concern it could impact other 
> jobs.
>
> Based on your statement above, the SQL timed window is not supported?
> Is there another way I can make sure that the state only has data that is 
> only more recent?
>
> Thank you,
> Sofya
>
> > On Thu, Aug 27, 2020 at 10:49 PM Danny Chan  wrote:
> > > Hi, Sofya T. Irwin ~
> > >
> > > Can you share your case why you need a timed-window join there ?
> > >
> > > Now the sql timed window join is not supported yet, and i want to hear 
> > > your voice if it is necessary to support in SQL.
> > >
> > >
> > > > Sofya T. Irwin  于2020年7月30日周四 下午10:44写道:
> > > > > Hi,
> > > > > I'm trying to investigate a SQL job using a time-windowed join that 
> > > > > is exhibiting a large, growing state. The join syntax is most similar 
> > > > > to the interval join 
> > > > > (https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html).
> > > > >
> > > > > A few questions:
> > > > > 1. Am I correct in understanding that State TTL is generally not 
> > > > > applicable for TableAPI&SQL? So we cannot use State TTL to limit 
> > > > > state size for a join?
> > > > >
> > > > > 2. It seems that Flink should be able to expire state even without 
> > > > > explicit settings based on this: "In TableAPI&SQL and DataStream, the 
> > > > > window aggregation and time-windowed join will clear expired state 
> > > > > using Timers which is triggered by watermark."  
> > > > > (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)
> > > > >
> > > > > To clarify: Does the above mean that Flink is expected to detect 
> > > > > expired state and clear it without explicit configuration to allow it 
> > > > > to do so?
> > > > >
> > > > > 3. I've looked into setting the idle state retention time. From what 
> > > > > I can understand, this particular setting is appropriate for my use 
> > > > > case.  "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a 
> > > > > job level configuration which will enable state ttl for all 
> > > > > non-time-based operator states." 
> > > > > (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)
> > > > >
> > > > > To clarify: Would enabling this setting control state growth? Is this 
> > > > > only available for blink planner? Currently we are using the 
> > > > > StreamPlanner. Is there any way to ensure that idle state has limited 
> > > > > retention for applications using the StreamPlanner?
> > > > >
> > > > > Thanks ahead,
> > > > > Sofya


user@flink.apache.org

2020-08-27 Thread Danny Chan
Hi, Sofya T. Irwin ~

Can you share your case why you need a timed-window join there ?

Now the sql timed window join is not supported yet, and i want to hear your
voice if it is necessary to support in SQL.


Sofya T. Irwin  于2020年7月30日周四 下午10:44写道:

> Hi,
> I'm trying to investigate a SQL job using a time-windowed join that is
> exhibiting a large, growing state. The join syntax is most similar to
> the interval join (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html
> ).
>
> A few questions:
> 1. Am I correct in understanding that State TTL is generally not
> applicable for TableAPI&SQL? So we cannot use State TTL to limit state size
> for a join?
>
> 2. It seems that Flink should be able to expire state even without
> explicit settings based on this: "In TableAPI&SQL and DataStream, the
> window aggregation and time-windowed join will clear expired state using
> Timers which is triggered by watermark."  (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html
> )
>
> To clarify: Does the above mean that Flink is expected to detect expired
> state and clear it without explicit configuration to allow it to do so?
>
> 3. I've looked into setting the idle state retention time. From what I can
> understand, this particular setting is appropriate for my use case.
> "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a job level
> configuration which will enable state ttl for all non-time-based operator
> states." (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html
> )
>
> To clarify: Would enabling this setting control state growth? Is this only
> available for blink planner? Currently we are using the StreamPlanner. Is
> there any way to ensure that idle state has limited retention for
> applications using the StreamPlanner?
>
> Thanks ahead,
> Sofya
>


[Survey] Demand collection for stream SQL window join

2020-08-26 Thread Danny Chan
Hi, users, here i want to collect some use cases about the window join[1], 
which is a supported feature on the data stream. The purpose is to make a 
decision whether to support it also on the SQL side, for example, 2 tumbling 
window join may look like this:

```sql
select ... window_start, window_end
from TABLE(
  TUMBLE(
    DATA => TABLE table_a,
    TIMECOL => DESCRIPTOR(rowtime),
    SIZE => INTERVAL '1' MINUTE)) tumble_a
    [LEFT | RIGHT | FULL OUTER] JOIN TABLE(
  TUMBLE(
    DATA => TABLE table_b,
    TIMECOL => DESCRIPTOR(rowtime),
    SIZE => INTERVAL '1' MINUTE)) tumble_b
on tumble_a.col1 = tumble_b.col1 and ...
```

I had some discussion off-line with some companies (Tencent, Bytedance and 
Meituan), and it seems that interval join is the most common case. The window 
join case is very few, so i'm looking forward there are some feed-back here.

Expecially, it is apprecaited if you can share the use cases of the window join 
(using the Flink data stream or written by other programs) and why the 
window-join is a must(can not replace with normal stream join or interval join).

Thanks in advance ~

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html

Best,
Danny Chan


Re: How to visit outer service in batch for sql

2020-08-26 Thread Danny Chan
Hi, did you try to define a UDAF there within your group window sql, where you 
can have a custom service there.

I’m afraid you are right, SQL only supports time windows.

Best,
Danny Chan
在 2020年8月26日 +0800 PM8:02,刘建刚 ,写道:
>       For API, we can visit outer service in batch through countWindow, such 
> as the following. We can visit outer service every 1000 records. If we visit 
> outer service every record, it will be very slow for our job.
> source.keyBy(new KeySelector())
>.countWindow(1000)
>.apply((WindowFunction)
>(s, globalWindow, values, collector) -> {
>List resultList = service.visit(values);
>for (MyType result: resultList) {
>if (result.ok) {
>collector.collect(result);
>}
>}
>});
>       But how can I write SQL to implement the batch logic? I can use udf to 
> visit outer service. Currently, Flink only support time window but not count 
> window. I also check the udf wiki but find it hard to batch records.
>       Any suggestion is welcome. Thank you.
>
>
>
>
>
>


Re: flink interval join后按窗口聚组问题

2020-08-26 Thread Danny Chan
For SQL, we always hold back the watermark when we emit the elements, for time 
interval:

return Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness;

For your case, the watermark would hold back for 1 hour, so the left join 
records would not delay when it is used by subsequent operators.

See KeyedCoProcessOperatorWithWatermarkDelay and 
RowTimeIntervalJoin.getMaxOutputDelay for details.

Best,
Danny Chan
在 2020年7月3日 +0800 PM3:29,元始(Bob Hu) <657390...@qq.com>,写道:
> 您好,我想请教一个问题:
> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。
> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and 
> a.rowtime + INTERVAL '1' HOUR 
> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + 
> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + 
> allowedLateness + 
> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, 
> rightRelativeSize) + 
> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group
>  by的时候这种右表数据为空的数据就丢掉了啊。
> flink版本 1.10.0。
>
> 下面是我的一段测试代码:
> import org.apache.commons.net.ntp.TimeStamp;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.IOUtils;
>
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class TimeBoundedJoin {
>
>public static AssignerWithPeriodicWatermarks getWatermark(Integer 
> maxIdleTime, long finalMaxOutOfOrderness) {
>AssignerWithPeriodicWatermarks timestampExtractor = new 
> AssignerWithPeriodicWatermarks() {
>private long currentMaxTimestamp = 0;
>private long lastMaxTimestamp = 0;
>private long lastUpdateTime = 0;
>boolean firstWatermark = true;
> //Integer maxIdleTime = 30;
>
>@Override
>public Watermark getCurrentWatermark() {
>if(firstWatermark) {
>lastUpdateTime = System.currentTimeMillis();
>firstWatermark = false;
>}
>if(currentMaxTimestamp != lastMaxTimestamp) {
>lastMaxTimestamp = currentMaxTimestamp;
>lastUpdateTime = System.currentTimeMillis();
>}
>if(maxIdleTime != null && System.currentTimeMillis() - 
> lastUpdateTime > maxIdleTime * 1000) {
>return new Watermark(new Date().getTime() - 
> finalMaxOutOfOrderness * 1000);
>}
>return new Watermark(currentMaxTimestamp - 
> finalMaxOutOfOrderness * 1000);
>
>}
>
>@Override
>public long extractTimestamp(Row row, long 
> previousElementTimestamp) {
>Object value = row.getField(1);
>long timestamp;
>try {
>timestamp = (long)value;
>} catch (Exception e) {
>timestamp = ((Timestamp)value).getTime();
>}
>if(timestamp > currentMaxTimestamp) {
>currentMaxTimestamp = timestamp;
>}
>return timestamp;
>}
>};
>return timestampExtractor;
>}
>
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> 

Re: How to write a customer sink partitioner when using flinksql kafka-connector

2020-08-18 Thread Danny Chan
Hi, Lei ~
You may need to implement the abstract class FlinkKafkaPartitioner and then use 
the full class name as the param value of the option ‘sink.partitioner’. 
FlinkFixedPartitioner[1] is a good example there.

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java

Best,
Danny Chan
在 2020年8月18日 +0800 PM8:18,wangl...@geekplus.com ,写道:
>
>
> CREATE TABLE kafka_sink_table(
>  warehouse_id INT,
>  pack_task_order_id BIGINT,
>  out_order_code STRING,
>  pick_order_id BIGINT,
>  end_time BIGINT
> WITH (
>  'connector'='kafka',
>  'topic'='ods_wms_pack_task_order',
>  'properties.bootstrap.servers'='172.19.78.32:9092',
>  'format'='json'
> );
>
>
> INSERT INTO  kafka_sink_table SELECT  ...
>
> As describe here: 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html
> I want to do partition according to warehouse_id.
>
> How should i write my customer partitioner? Is there any example?
>
> Thanks,
> Lei
>
> wangl...@geekplus.com


Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

2020-08-13 Thread Danny Chan
Weighing ~

tEnv.executeSql would execute the SQL asynchronously, e.g. submitting a job to 
the backend cluster with a builtin job name, the tEnv.executeSql itself did 
return a JobResult immediately with a constant affected rows count -1.

Best,
Danny Chan
在 2020年8月13日 +0800 PM3:46,Lu Weizheng ,写道:
> Thanks Timo,
>
> So no need to use execute() method in Flink SQL If I do all the thins from 
> source to sink in SQL.
>
> Best Regards,
> Lu
>
> > 2020年8月13日 下午3:41,Timo Walther  写道:
> >
> > Hi Lu,
> >
> > `env.execute("table api");` is not necessary after FLIP-84 [1]. Every 
> > method that has `execute` in its name will immediately execute a job. 
> > Therefore your `env.execute` has an empty pipeline.
> >
> > Regards,
> > Timo
> >
> > [1] 
> > https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >
> > On 13.08.20 09:34, Lu Weizheng wrote:
> > > Hi,
> > > I am using Flink 1.11 SQL using java. All my operations are in SQL. I 
> > > create source tables and insert result into sink tables. No other Java 
> > > operators. I execute it in Intellij. I can get the final result in the 
> > > sink tables. However I get the following error. I am not sure it is a bug 
> > > or there is something wrong in my code? Acutally it does not affect the 
> > > computation.
> > > /Exception in thread "main" java.lang.IllegalStateException: No operators 
> > > defined in streaming topology. Cannot execute./
> > > /at 
> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
> > > /at 
> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
> > > /at 
> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
> > > /at 
> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/
> > > /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/
> > > Here's my code:
> > > EnvironmentSettings fsSettings = 
> > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > > StreamExecutionEnvironment env = 
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> > > fsSettings);
> > > // create source and sink tables...
> > > tEnv.executeSql("INSERT INTO sensor_1min_avg " +
> > > "SELECT " +
> > > " room, " +
> > > " AVG(temp) AS avg_temp," +
> > > " TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
> > > "FROM sensor " +
> > > "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
> > > env.execute("table api");
> >
>


Re: Two Queries and a Kafka Topic

2020-08-11 Thread Danny Chan
Hi, Marco ~

It seems what you need is a temporal join from the SQL side, you can define 2 
Flink tables for your PostgreSQL ones and join your Kafka stream with them 
[1][3].

Flink 1.10 also supports this. There is some difference with the DDL compared 
to 1.11 [2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/temporal_tables.html#temporal-table

Best,
Danny Chan
在 2020年8月5日 +0800 AM4:34,Marco Villalobos ,写道:
> Lets say that I have:
>
> SQL Query One from data in PostgreSQL (200K records).
> SQL Query Two from data in PostgreSQL (1000 records).
> and Kafka Topic One.
>
> Let's also say that main data from this Flink job arrives in Kafka Topic One.
>
> If I need SQL Query One and SQL Query Two to happen just one time, when the 
> job starts up, and afterwards maybe store it in Keyed State or Broadcast 
> State, but it's not really part of the stream, then what is the best practice 
> for supporting that in Flink
>
> The Flink job needs to stream data from Kafka Topic One, aggregate it, and 
> perform computations that require all of the data in SQL Query One and SQL 
> Query Two to perform its business logic.
>
> I am using Flink 1.10.
>
> I supposed to query the database before the Job I submitted, and then pass it 
> on as parameters to a function?
> Or am I supposed to use JDBCInputFormat for both queries and create two 
> streams, and somehow connect or broadcast both of them two the main stream 
> that uses Kafka Topic One?
>
> I would appreciate guidance. Please.  Thank you.
>
> Sincerely,
>
> Marco A. Villalobos
>
>
>


Re: Does Flink automatically apply any backpressure ?

2020-08-03 Thread Danny Chan
Yes, just like Jake said, the back pressure happened automatically and usually 
there is no need to tweak it, you actually can have configure the metrics about 
it, see [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html

Best,
Danny Chan
在 2020年7月31日 +0800 AM10:28,Jake ,写道:
>
> Hi Suraj Puvvada
>
> Yes, Flink back pressure depend on the Flink task buffer。process task will 
> sends buffer remaining size to source, source will slow down.
>
> https://www.ververica.com/blog/how-flink-handles-backpressure
>
> Jake
>
>
> > On Jul 31, 2020, at 2:48 AM, Suraj Puvvada  wrote:
> >
> > Hello
> >
> > I am trying to understand if Flink has a mechanism to automatically apply 
> > any backpressure by throttling any operators ? For example if I have a 
> > Process function that reads from a Kafkaa source and writes to a Kafka 
> > sink. If the process function is slow will the kafka source be 
> > automatically throttled ?
> >
> > Thanks
> > Suraj
>


Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Danny Chan
You can not do that in Flink yet, Flink partition column must be mapped to 
columns from the table schema which you can select from. The syntax is a little 
different from Hive’s =>

create table table_name (
  idint,
  dtDontQuery   string,
  name  string
)
partitioned by (date string)

In which you can declare the partition column name & type at the same time.

Best,
Danny Chan
在 2020年7月21日 +0800 PM11:30,Dongwon Kim ,写道:
> Thanks Jark for the update.
>
> However, getting back to the original question, can I use a nested column 
> directly for CREATE TABLE PARTITIONED BY like below without declaring an 
> additional column?
>
> > CREATE TABLE output
> > PARTITIONED BY (`location.transId`)
> > WITH (
> >   'connector' = 'filesystem',
> >   'path' = 'east-out',
> >   'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
>
> I tried (`location`.transId) as well but it fails with an exception:
> > Exception in thread "main" org.apache.flink.table.api.SqlParserException: 
> > SQL parse failed. Encountered "." at line 3, column 27.
> > Was expecting one of:
> >     ")" ...
> >     "," ...
> >
> > at 
> > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> > at 
> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> > at 
> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
> > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." 
> > at line 3, column 27.
> > Was expecting one of:
> >     ")" ...
> >     "," ...
> >
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
> > at 
> > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
> > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
> > at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
> > at 
> > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
> > ... 3 more
> > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." 
> > at line 3, column 27.
> > Was expecting one of:
> >     ")" ...
> >     "," ...
> >
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
> > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
> > ... 5 more
>
> Best,
>
> Dongwon
>
> > On Wed, Jul 22, 2020 at 12:09 AM Jark Wu  wrote:
> > > Hi Dongwon,
> > >
> > > I think this is a bug in the Filesystem connector which doesn't exclude 
> > > the computed columns when building the TableSource.
> > > I created an issue [1] to track this problem.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]: https://issues.apache.org/jira/browse/FLINK-18665
> > >
> > > > On Tue, 21 Jul 2020 at 17:31, Dongwon Kim  wrote:
> > > > > Hi Danny,
> > > > >
> > > > > >  Which version did you use
> > > > > I use Flink 1.11.0.
> > > > >
> > > > > >  what SQL context throws the error ?
> > > > > I think the declaration itself is not a pro

Re: How to use a nested column for CREATE TABLE PARTITIONED BY

2020-07-21 Thread Danny Chan
Hi, I execute the sql below

"""
   |create table navi (
   |  a STRING,
   |  location ROW
   |) with (
   |  'connector' = 'filesystem',
   |  'path' = 'east-out',
   |  'format' = 'json'
   |)
   |""".stripMargin
tableEnv.executeSql(sql0)
val sql =
"""
   |CREATE TABLE output (
   |  `partition` AS location.transId
   |) PARTITIONED BY (`partition`)
   |WITH (
   |  'connector' = 'filesystem',
   |  'path' = 'east-out',
   |  'format' = 'json'
   |) LIKE navi (EXCLUDING ALL)
   |""".stripMargin
tableEnv.executeSql(sql)

In master branch, both are correct, can you share you stack trace detail ? 
Which version did you use and what SQL context throws the error ?

Best,
Danny Chan
在 2020年7月21日 +0800 PM4:55,Dongwon Kim ,写道:
> Hi,
>
> I want to create subdirectories named after values of a nested column, 
> location.transId.
>
> This is my first attempt:
> > CREATE TABLE output
> > PARTITIONED BY (`location.transId`)
> > WITH (
> >   'connector' = 'filesystem',
> >   'path' = 'east-out',
> >   'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
>
> It fails with the following errors:
> > Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> > Partition column 'location.transId' not defined in the table schema. 
> > Available columns: ['type', 'location']
> > at 
> > org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
> > at 
> > org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
> > at 
> > org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
> > at 
> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
> > at 
> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> > at 
> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>
> As It seems like nested columns are not recognized as a eligible column for 
> PARTITIONED BY, I tried the following:
> > CREATE TABLE output (
> >   `partition` AS location.transId
> > ) PARTITIONED BY (`partition`)
> > WITH (
> >   'connector' = 'filesystem',
> >   'path' = 'east-out',
> >   'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
> It also fails:
> >  Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> > The field count of logical schema of the table does not match with the 
> > field count of physical schema
> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` 
> STRING>,STRING].
>
> Thanks in advance,
>
> Dongwon


Re: Encoding problem in WHERE LIKE

2020-07-20 Thread Danny Chan
Hi, Dongwon ~

> Caused by: org.apache.calcite.sql.parser.SqlParseException: Lexical error at 
> line 1, column 96.  Encountered

The error did report the position, you can take a reference to see which syntax 
context caused the problem.

Best,
Danny Chan
在 2020年7月20日 +0800 PM11:10,Dongwon Kim ,写道:
> Hi Leonard,
>
> You're right; I was missing a single quotation mark before the LIKE.
>
> There's no encoding problem at all!
> Sorry for the confusion.
>
> Thanks,
>
> Dongwon
>
>
> > On Tue, Jul 21, 2020 at 12:00 AM Leonard Xu  wrote:
> > > Hi, Kim
> > >
> > > The clause  ` LIKE '%양현마을%’ ` should work well, could you post the the 
> > > entire query(or select clause) ?
> > >
> > > Best
> > > Leonard Xu
> > >
> > > > 在 2020年7月20日,21:49,Dongwon Kim  写道:
> > > >
> > > > When I execute the following query in .sqlQuery(),
> > > > > SELECT ...
> > > > > FROM ...
> > > > > WHERE location.goalName LIKE '%양현마을%'
> > >


Re: Flink SQL - Join Lookup Table

2020-07-20 Thread Danny Chan
Seems you want a temporal table join instead of a two stream join, if that is 
your request, you should use syntax

Join LookupTable FOR SYSTEM_TIME AS OF …

See [1] for details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Danny Chan
在 2020年7月21日 +0800 AM6:32,Kelly Smith ,写道:
> Hi folks,
>
> I have a question Flink SQL. What I want to do is this:
>
>
> • Join a simple lookup table (a few rows) to a stream of data to enrich the 
> stream by adding a column from the lookup table.
>
>
>
> For example, a simple lookup table:
>
> CREATE TABLE LookupTable (
>     `computeClass`  STRING,
>     `multiplier`    FLOAT
> ) WITH (
>     'connector' = 'filesystem',
>     'path' = 'fpu-multipliers.csv',
>     'format' = 'csv'
> )
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a 
> `computeClass` field. I simply want to join (in a streaming fashion) the 
> `multiplier` field above.
>
> SELECT
>`timestamp`,
>// ...
>ks.computeClass,
>lt.`multiplier`
> FROM KafkaStream ks
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
> Doing a simple join like that gives me this error:
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be in 
> the input rows of a regular join. As a workaround you can cast the time 
> attributes of input tables to TIMESTAMP before.”
>
> Which leads me to believe that I should use an Interval Join instead, but 
> that doesn’t seem to be appropriate since my table is static and has no 
> concept of time. Basically, I want to hold the entire lookup table in memory, 
> and simply enrich the Kafka stream (which need not be held in memory).
>
> Any ideas on how to accomplish what I’m trying to do?
>
> Thanks!
> Kelly


Fwd: Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Danny Chan

Best,
Danny Chan
-- 转发信息 --
发件人: Danny Chan 
日期: 2020年7月20日 +0800 PM4:51
收件人: Dongwon Kim 
主题: Re: [Table API] how to configure a nested timestamp field

> Or is it possible you pre-define a catalog there and register through the SQL 
> CLI yaml ?
>
> Best,
> Danny Chan
> 在 2020年7月20日 +0800 PM3:23,Dongwon Kim ,写道:
> > Hi Leonard,
> >
> > > Unfortunately the answer is no, the YAML you defined will parse by Table 
> > > API and then execute, the root cause of your post error is Table API does 
> > > not support computed column now,
> > > there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, 
> > > I think DDL is recommended way since FLINK 1.11.0.
> > Okay, thanks a lot for your input.
> >
> > I just tried out Flink SQL client and wanted to store pre-defined YAML 
> > files each declaring a source table from a Kafka topic.
> > As you advised, I have to manually enter DDL in the SQL client on FLINK 
> > 1.11.x
> >
> > Best,
> >
> > Dongwon
> >
> >
> > > On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu  wrote:
> > > > Hi, Kim
> > > >
> > > > > Hi Leonard,
> > > > >
> > > > > Can I have a YAML definition corresponding to the DDL you suggested?
> > > >
> > > > Unfortunately the answer is no, the YAML you defined will parse by 
> > > > Table API and then execute, the root cause of your post error is Table 
> > > > API does not support computed column now,
> > > >
> > > > there is a FLIP under discussion[1], this should be ready in 1.12.0. 
> > > > BTW, I think DDL is recommended way since FLINK 1.11.0.
> > > >
> > > > Best,
> > > > Leonard Xu
> > > > [1] 
> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
> > > >
> > > > > 在 2020年7月20日,14:30,Dongwon Kim  写道:
> > > > >
> > > > >
> > > > > I tried below (Flink 1.11.0) but got some error:
> > > > > > tables:
> > > > > >   - name: test
> > > > > >     type: source-table
> > > > > >     update-mode: append
> > > > > >     connector:
> > > > > >       property-version: 1
> > > > > >       type: kafka
> > > > > >       version: universal
> > > > > >       topic: ...
> > > > > >       properties:
> > > > > >         bootstrap.servers: ...
> > > > > >         group.id: ...
> > > > > >     format:
> > > > > >       property-version: 1
> > > > > >       type: json
> > > > > >     schema:
> > > > > >       - name: type
> > > > > >         data-type: STRING
> > > > > >       - name: location
> > > > > >         data-type: >
> > > > > >           ROW<
> > > > > >             id STRING,
> > > > > >             lastUpdateTime BIGINT
> > > > > >           >
> > > > > >       - name: timestampCol
> > > > > >         data-type: TIMESTAMP(3)
> > > > > >         rowtime:
> > > > > >           timestamps:
> > > > > >             type: from-field
> > > > > >             from: 
> > > > > > TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 
> > > > > > '-MM-dd HH:mm:ss'))
> > > > > >           watermarks:
> > > > > >             type: periodic-bounded
> > > > > >             delay: 5000
> > > > >
> > > > > SQL client doesn't complain about the file but, when I execute 
> > > > > "SELECT timestampCol from test", the job fails with the following 
> > > > > error message:
> > > > > > Caused by: java.lang.NullPointerException
> > > > > > at 
> > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> > > > > > at 
> > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> > > > > > at 
> > > >

Re: HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

2020-07-16 Thread Danny Chan
I suspect there are some inconsistency in the nullability of the whole record 
field, can you compare the 2 schema and see the diff ? For a table, you can get 
the TableSchema first and print it out.

Best,
Danny Chan
在 2020年7月16日 +0800 AM10:56,Leonard Xu ,写道:
> Hi, Jim
>
> Could you post error message in text that contains the entire schema of query 
> and sink? I doubt there are some  fields type were mismatched.
>
> Best,
> Leonard Xu
>
> > 在 2020年7月16日,10:29,Jim Chen  写道:
> >
> > Hi,
> >   I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, 
> > report an error like validateSchemaAndApplyImplicitCast. Means that the 
> > Query Schema and Sink Schema are inconsistent.
> >   Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink 
> > Schema is Row(device_id). I don't know how to write in sql to be consistent 
> > with hbase's sink schema.
> >   I try to write sql like select device_id as rowkey, ROW( device_id as 
> > [cannot write as]  ) as f1
> >
> > error message as follow:
> > 
> >
> > sample code like:
> > HBase sink ddl:
> > String ddlSource = "CREATE TABLE 
> > test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
> >                 "  rowkey STRING,\n" +
> >                 "  f1 ROW< \n" +
> >                 "        device_id STRING,\n" +
> >                 "        pass_id STRING,\n" +
> >                 "        first_date STRING,\n" +
> >                 "        first_channel_id STRING,\n" +
> >                 "        first_app_version STRING,\n" +
> >                 "        first_server_time STRING,\n" +
> >                 "        first_server_hour STRING,\n" +
> >                 "        first_ip_location STRING,\n" +
> >                 "        first_login_time STRING,\n" +
> >                 "        sys_can_uninstall STRING,\n" +
> >                 "        update_date STRING,\n" +
> >                 "        server_time BIGINT,\n" +
> >                 "        last_pass_id STRING,\n" +
> >                 "        last_channel_id STRING,\n" +
> >                 "        last_app_version STRING,\n" +
> >                 "        last_date STRING,\n" +
> >                 "        os STRING,\n" +
> >                 "        attribution_channel_id STRING,\n" +
> >                 "        attribution_first_date STRING,\n" +
> >                 "        p_product STRING,\n" +
> >                 "        p_project STRING,\n" +
> >                 "        p_dt STRING\n" +
> >                 "        >\n" +
> >                 ") WITH (\n" +
> >                 "  'connector.type' = 'hbase',\n" +
> >                 "  'connector.version' = '1.4.3',\n" + // 
> > 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
> >                 "  'connector.table-name' = 
> > 'dw_common_mobile_device_user_mapping_new',\n" +
> >                 "  'connector.zookeeper.quorum' = '"+ zookeeperServers 
> > +"',\n" +
> >                 "  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
> >                 "  'connector.write.buffer-flush.max-size' = '2mb',\n" +
> >                 "  'connector.write.buffer-flush.max-rows' = '1000',\n" +
> >                 "  'connector.write.buffer-flush.interval' = '2s'\n" +
> >                 ")";
> >
> > insert into sql:
> >
> > String bodyAndLocalSql = "" +
> > //                "insert into 
> > test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
> >                 "SELECT CAST(rowkey AS STRING) AS rowkey, " +
> >                 " ROW(" +
> >                 " device_id, pass_id, first_date, first_channel_id, 
> > first_app_version, first_server_time, first_server_hour, first_ip_location, 
> > first_login_time, sys_can_uninstall, update_date, server_time, 
> > last_pass_id, last_channel_id, last_app_version, last_date, os, 
> > attribution_channel_id, attribution_first_date, p_product, p_project, p_dt 
> > " +
> >                 &

Re: Table API throws "No FileSystem for scheme: file" when loading local parquet

2020-07-12 Thread Danny Chan
> No FileSystem for scheme: file

It seems that your path does not work correctly, from the patch you gave, the 
directly name 'test.parquet’ seems invalid.

Best,
Danny Chan
在 2020年7月11日 +0800 AM8:07,Danny Chan ,写道:
>
> It seems that your path does not work correctly, from the patch you gave, the 
> directly name 'test.parquet’ seems invalid.


Re: Does Flink support TFRecordFileOutputFormat?

2020-07-12 Thread Danny Chan
I didn’t see any class named TFRecordFileOutputFormat in Spark, for TF do you 
mean TensorFlow ?

Best,
Danny Chan
在 2020年7月10日 +0800 PM5:28,殿李 ,写道:
> Hi,
>
> Does Flink support TFRecordFileOutputFormat? I can't find the relevant 
> information in the document.
>
> As far as I know, spark is supportive.
>
>
> Best regards
> Peidian Li


Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-08 Thread Danny Chan
Thanks Zhijiang and Piotr for the great work as release manager, and thanks
everyone who makes the release possible!

Best,
Danny Chan
在 2020年7月8日 +0800 PM4:59,Congxian Qiu ,写道:
>
> Thanks Zhijiang and Piotr for the great work as release manager, and thanks
> everyone who makes the release possible!


Re: can't exectue query when table type is datagen

2020-07-06 Thread Danny Chan
Dear xin Destiny ~

It seems that you use the legacy planner so the exception throws [1] ~

I agree that there needs a prompt here to indicate that it is a legacy planner, 
have fired an issue [2],
Actually for legacy, it is a regression because before the change, the computed 
column is supported well.

[1] 
https://github.com/apache/flink/blob/1b1c343e8964c6400c7c1de3c70212522ba59a64/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java#L86
[2] https://issues.apache.org/jira/browse/FLINK-18500

Best,
Danny Chan
在 2020年7月5日 +0800 AM10:52,xin Destiny ,写道:
> Hi, all:
> i use zeppelin execute sql, FLink version is Flink 1.11 snapshot ,build from 
> branch release-1.11 ,commit is 334f35cbd6da754d8b5b294032cd84c858b1f973
> when the table type is datagen, Flink will thrown exception ,but the 
> exception message is null ;
>
> My DDL is :
> CREATE TABLE datagen_dijie2 (
>  f_sequence INT,
>  f_random INT,
>  f_random_str STRING,
>  ts AS localtimestamp,
>  WATERMARK FOR ts AS ts
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='5',
>  'fields.f_sequence.kind'='sequence',
>  'fields.f_sequence.start'='1',
>  'fields.f_sequence.end'='1000',
>  'fields.f_random.min'='1',
>  'fields.f_random.max'='1000',
>  'fields.f_random_str.length'='10'
> );
>
> My query sql is :
> select * from datagen_dijie2;
> the exception is :
> Fail to run sql command: select * from datagen_dijie2 
> org.apache.flink.table.api.ValidationException: SQL validation failed. null 
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
>  at 
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:102)
>  at 
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:526)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:297)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:191)
>  at 
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:156)
>  at 
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
>  at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
>  at 
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
>  at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at 
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
>  at 
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.UnsupportedOperationException at 
> org.apache.flink.table.planner.ParserImpl.parseSqlExpression(ParserImpl.java:86)
>  at 
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>  at 
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>  at 
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>  at 
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>  at 
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>  at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>  at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>  at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) at 
> org.apache.calcite.sql.validate.EmptyScope.resolve_

Re: Dynamic source and sink.

2020-07-01 Thread Danny Chan
Sorry, a job graph is solid while we compile it before submitting to the 
cluster, not dynamic as what you want.

You did can write some wrapper operators which response to your own PRCs to run 
the appended operators you want,
But the you should keep the consistency semantics by yourself.

Best,
Danny Chan
在 2020年6月28日 +0800 PM3:30,C DINESH ,写道:
> Hi All,
>
> In a flink job I have a pipeline. It is consuming data from one kafka topic 
> and storing data to Elastic search cluster.
>
> without restarting the job can we add another kafka cluster and another 
> elastic search sink to the job. Which means i will supply the new kafka 
> cluster and elastic search details in the topic.  After consuming the data 
> can our flink job add the new source and sink to the same job.
>
>
> Thanks & Regards,
> Dinesh.


Re: Flink table modules

2020-04-26 Thread Danny Chan
> Are they basically a way to group together a set of UDF functions

You are right, each module would have a set of functions which behaves like a 
Flink builtin function, the functions don’t have any namespaces. When there are 
two objects of the same name residing in two modules, Flink always resolves the 
object reference to the one in the 1st loaded module. Here is the doc why we 
evolves this feature [1]

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-68%3A+Extend+Core+Table+System+with+Pluggable+Modules

Best,
Danny Chan
在 2020年4月23日 +0800 PM11:12,Flavio Pompermaier ,写道:
>
> Are they basically a way to group together a set of UDF functions


Re: LookupableTableSource from kafka consumer

2020-04-21 Thread Danny Chan
We usually implementation a LookupableTableSource based on k-v store data 
sources, such as the ES, Hbase and Redis.
For Kafka, what we usually do is a regular stream join [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#regular-joins

Best,
Danny Chan
在 2020年4月21日 +0800 PM8:02,Jark Wu ,写道:
> Hey,
>
> You can take JDBCTableSource [1] as an example about how to implement a 
> LookupableTableSource.
> However, I'm not sure how to support lookup for kafka. Because AFAIK, kafka 
> doesn't have the ability to lookup by key?
>
> Best,
> Jark
>
> [1]: 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
>
> > On Tue, 21 Apr 2020 at 19:44, Clay Teeter  wrote:
> > > Hey, does anyone have any examples that i can use to create a 
> > > LookupableTableSource from a kafka topic?
> > >
> > > Thanks!
> > > Clay
> > >


Re: Flink upgrade to 1.10: function

2020-04-21 Thread Danny Chan
The JSON_VALUE was coded into the parser, which is always parsed as the builtin 
operator, so there is no change to override it yet.

I have fired an issue[1] to track this and hope we can resolve it in the next 
Calcite release.

[1] https://issues.apache.org/jira/browse/CALCITE-3943

Best,
Danny Chan
在 2020年4月17日 +0800 PM9:00,Till Rohrmann ,写道:
> Hi,
>
> thanks for reporting these problems. I'm pulling in Timo and Jark who are 
> working on the SQL component. They might be able to help you with your 
> problem.
>
> Cheers,
> Till
>
> > On Thu, Apr 16, 2020 at 11:10 AM seeksst  wrote:
> > > Hi, All
> > >
> > > Recently, I try to upgrade flink from 1.8.2 to 1.10, but i meet some 
> > > problem about function. In 1.8.2, there are just Built-In function and 
> > > User-defined Functions, but in 1.10, there are 4 categories of funtions.
> > > I defined a function which named JSON_VALUE in my system, it doesn’t 
> > > exist in 1.8.2, but present to 1.10.0. of course i use it in sql, 
> > > something like  'select JSON_VALUE(string, string) from table1’, no 
> > > category or database. the problem is in 1.10.0, my function will be 
> > > recognized as SqlJsonValueFunction, and args not match, so my sql is 
> > > wrong.
> > >         I read document about Ambiguous Function Reference, In my 
> > > understanding, my function will be registered as temporary system 
> > > function, and it should be chosen first. isn’t it? I try to debug it, and 
> > > find some information:
> > > First, sql will be parsed by ParseImpl, and JSON_VALUE will be parsed as 
> > > SqlBasicCall, operator is SqlJsonValueFunction, it’s belonged to SYSTEM 
> > > catalog and the kind is OTHER_FUNCTION. Then, SqlUtil.lookupRoutine will 
> > > not find this SqlFunction, because it not in BasicOperatorTable. my 
> > > function in FunctionCatalog, but SqlJsonValueFunction belonged to SYSTEM, 
> > > not belong to USER_DEFINED, so program will not search it in 
> > > FunctionCatalog.
> > > How can i solve this problem without modifying sql and function name? my 
> > > program can choose flink version and have many sql jobs, so i don’t wish 
> > > to modify sql and function name.
> > > Thansk.