Re: In 1.11.2/flinkSql/batch, tableEnv.getConfig.setNullCheck(false) seems to break group by-s

2020-10-16 Thread Timo Walther
Hi Jon, I would not recommend to use the configuration parameter. It is not deprecated yet but can be considered legacy code from before we reworked the type system. Regards, Timo On 16.10.20 13:23, Kurt Young wrote: Yes, I think this is a bug, feel free to open a jira and a pull request.

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Timo Walther
Hi Manas, you need to make sure to differentiate between what Flink calls "pre-flight phase" and "cluster phase". The pre-flight phase is were the pipeline is constructed and all functions are instantiated. They are then later serialized and send to the cluster. If you are reading your pro

Re: flink job will restart over and over again if a taskmanager's disk damages

2020-10-22 Thread Timo Walther
Hi, thanks for letting us know about this shortcoming. I will link someone from the runtime team in the JIRA issue. Let's continue the discussion there. Regards, Timo On 22.10.20 05:36, chenkaibit wrote: Hi everyone:  I met this Exception when a hard disk was damaged: https://issues.apache

Re: Configurable Parser

2020-10-22 Thread Timo Walther
Hi Theo, this is indeed a difficult use case. The KafkaDeserializationSchema is actually meant mostly for deserialization and should not contain more complex logic such as joining with a different topic. You would make KafkaDeserializationSchema stateful. But in your usecase, I see no better

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Timo Walther
CONFIG_TOPIC = s.get("CONFIG_TOPIC"); CONFIG_KAFKA = s.get("CONFIG_KAFKA"); } } This produces the same issue. With the easier solution that you listed, are you implying I use multiple instances or a singleton pattern of some sort? On Thu, Oct 22, 2020 at 1:2

Re: 回复: rename error in flink sql

2020-10-22 Thread Timo Walther
Hi, sorry for the late reply. I the problem was in the `tEnv.toAppendStream(result,Order.class).print();` right? You can also find a new example here: https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/Gett

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-23 Thread Timo Walther
-flight and cluster phases? I couldn't find anything on ci.apache.org/projects/flink <http://ci.apache.org/projects/flink> and I think this behaviour should be documented as a warning/note. On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <mailto:twal...@apache.org>> wrote:

Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-04 Thread Timo Walther
Hi Rex, sorry for the late reply. POJOs will have much better support in the upcoming Flink versions because they have been fully integrated with the new table type system mentioned in FLIP-37 [1] (e.g. support for immutable POJOs and nested DataTypeHints etc). For queries, scalar, and table

Re: Error parsing annotations in flink-table-planner_blink_2.12-1.11.2

2020-11-05 Thread Timo Walther
Hi Yuval, this error is indeed weird. @Aljoscha: I think Calcite uses apiguardian. When I saw the initial error, it looked like there are different Apache Calcite versions in the classpath. I'm wondering if this is a pure SBT issue because I'm sure that other users would have reported this er

Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-05 Thread Timo Walther
n the next version with this change might release? On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Rex, sorry for the late reply. POJOs will have much better support in the upcoming Flink versions because they have been fully integrate

Re: Best way to test Table API and SQL

2020-11-05 Thread Timo Walther
gular one? Thanks On Fri, Oct 9, 2020 at 7:55 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Rex, let me copy paste my answer from a similar thread 2 months ago: Hi, this might be helpful as well: https://lists.apache.o

Re: Filter By Value in List

2020-11-05 Thread Timo Walther
Hi Rex, as far as I know, the IN operator only works on tables or a list of literals where the latter one is just a shortcut for multiple OR operations. I would just go with a UDF for this case. In SQL you could do an UNNEST to convert the array into a table and then use the IN operator. But

Re: SQL aggregation functions inside the Table API

2020-11-09 Thread Timo Walther
Hi Ori, we might support SQL expressions soon in Table API. However, we might not support aggregate functions immediately. I would recommend to use `sqlQuery` for now. The following is supported: val table = tenv.fromDataStream(stream) val newTable = tenv.sqlQuery(s"SELECT ... FROM $table")

Re: ValidationException using DataTypeHint in Scalar Function

2020-11-09 Thread Timo Walther
Sorry for jumping in so late. I think Dawid gave a nice summary. As he said, integration of the DataStream <> Table integration is still under development. Until then I would suggest to option 3) which means don't upgrade the functions and use the old registration function `registerFunction`.

Re: Table SQL Filesystem CSV recursive directory traversal

2020-11-09 Thread Timo Walther
Hi Ruben, by looking at the code, it seems you should be able to do that. At least for batch workloads we are using org.apache.flink.formats.csv.CsvFileSystemFormatFactory.CsvInputFormat which is a FileInputFormat that supports the mentioned configuration option. The problem is that this mig

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Timo Walther
Hi Felipe, with non-deterministic Jark meant that you never know if the mini batch timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the execution. This depends how fast records arrive at the operator. In general, processing time can be considered non-deterministic, because 1

Re: Changing the topology while upgrading a job submitted by SQL

2020-11-10 Thread Timo Walther
Hi, unfortunately, we currently don't provide any upgrading guarantees for SQL. In theory we could add a possibility to add operator uids, however, this will not help much because the underlying SQL operators or better optimization rules that create a smarter pipeline could change the entire

Re: Flink Kafka Table API for python with JAAS

2020-11-10 Thread Timo Walther
Hi, are you using the SQL jars or do you build the dependency jar file yourself? It might be the case that the SQL jar for Kafka does not include this module as the exception indicates. You might need to build a custom Kafka jar with Maven and all dependencies you need. (including correct MET

Re: How to convert Int to Date

2020-11-17 Thread Timo Walther
Hi Rex, the classes mentioned in the documentation such as `int` and `java.lang.Integer` are only used when you leave the SQL world to a UDF or to a Java implementation in a sink. But as a SQL user you only need to pay attention to the logical data type. Those must match entirely or be a sup

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread Timo Walther
Hi Fuyao, sorry for not replying earlier. You posted a lot of questions. I scanned the thread quickly, let me try to answer some of them and feel free to ask further questions afterwards. "is it possible to configure the parallelism for Table operation at operator level" No this is not pos

Re: Print on screen DataStream content

2020-11-24 Thread Timo Walther
Hi Simone, if you are just executing DataStream pipelines locally in your IDE while prototyping. You should be able to use `DataStream#print()` which just prints to standard out [1] (It might be hidden between the log messages). For debugging locally, you can also just set breakpoints in your

Re: Question: How to avoid local execution being terminated before session window closes

2020-11-24 Thread Timo Walther
Hi Klemens, what you are observing are reasons why event-time should be preferred over processing-time. Event-time uses the timestamp of your data while processing-time is to basic for many use cases. Esp. when you want to reprocess historic data, you want to do that at full speed instead of

Re: Job Manager logs

2020-11-24 Thread Timo Walther
Hi Saksham, could you tell us a bit more about your deployement where you run Flink. This seems to be the root exception: 2020-11-24 11:11:16,296 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler [] - Failed to transfer file from TaskExecutor f0dc0ae680e65a

Re: Question: How to avoid local execution being terminated before session window closes

2020-11-24 Thread Timo Walther
integrate it with an Apache Kafka Service. Output is written to a Postgres-Database-System. I'll have a look at your proposal and let you know if it worked, after having finished a few prerequisite parts. Regards     Klemens Am 24.11.20 um 12:59 schrieb Timo Walther: Hi Klemens, wha

Re: Dynamic ad hoc query deployment strategy

2020-11-24 Thread Timo Walther
I agree with Dawid. Maybe one thing to add is that reusing parts of the pipeline is possible via StatementSets in TableEnvironment. They allow you to add multiple queries that consume from a common part of the pipeline (for example a common source). But all of that is compiled into one big job

Re: Learn flink source code book recommendation

2020-11-24 Thread Timo Walther
Hi, one advice I can give you is to checkout the code and execute some of the examples in debugging mode. Esp. within Flink's functions e.g. MapFunction or ProcessFunction you can set a breakpoint and look at the stack trace. This gives you a good overview about the Flink stack in general.

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-24 Thread Timo Walther
ic-watermarkgenerator Best, Fuyao On 11/20/20 08:55, Timo Walther wrote: Hi Fuyao, sorry for not replying earlier. You posted a lot of questions. I scanned the thread quickly, let me try to answer some of them and feel free to ask further questions afterwards. "is it possible to con

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-26 Thread Timo Walther
/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins>) Thanks! Fuyao On Tue, Nov 24, 2020 at 9:06 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Fuyao, great that you could make progre

Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Timo Walther
Hi, first of all we don't support ListTypeInfo in Table API. Therefore, it is treated as a RAW type. The exception during exception creation is a bug that should be fixed in future version. But the mismatch is valid: ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`. Can you

Re: How do I pass the aggregated results of an SQL TUMBLING Window as one set of data to a Process Function?

2020-12-14 Thread Timo Walther
Hi Marco, sorry for the late reply. Have you looked into user-defined aggregate functions for SQL? I think your requirements can be easily implemented there. You can declare multiple aggregate functions per window. There is also the built-in function LISTAGG that might help for your use case.

Re: Is there any way to directly convert org.apache.flink.table.api.Table into my POJO.

2020-12-14 Thread Timo Walther
Hi, first, we should clarify "continue to be put into the Flink table": A Flink Table object does not physically store the data. It is basically a view that contains a transformation pipeline. When you are calling `collect()` the pipeline is executed and all results from the cluster are stre

Re: How does Flink cache values that also do not exist in the database?

2020-12-14 Thread Timo Walther
Hi Marco, when you say "database" are you refering to the JDBC connector or would you like to perform a JDBC query within some UDF? In the latter case, I would recommend to use Flink's ProcessFunction because you can store the cache hits in state (and thus keep them forever). SQL/Table API doe

Re: state inside functions

2020-12-17 Thread Timo Walther
Hi, if you would like to dynamically adjust configuration of your streaming job, it might be a good approach to consider the configuration as a stream itself. The connect() API can be used to connect a main stream with a control stream. In any case the configuration should be persisted in st

Re: Changing application configuration when restoring from checkpoint/savepoint

2020-12-17 Thread Timo Walther
Hi, I gave some answers in the other mail thread. Some additional comment: In general I think even configuration can be considered as state in this case. If state is not set, the job can be considered as a fresh start. Once the state is set, it would basically be just a configuration update.

Re: Set TimeZone of Flink Streaming job

2020-12-17 Thread Timo Walther
Hi, Flink does not support time zones currently. However, all time operations work on Java `long` values. It can be up to the user what this long value represents. It must not be UTC but can also be adjusted for another time zone. Since DataStream API supports arbirary Java objects, users can

Re: Flink - sending clicks+impressions to AWS Personalize

2020-12-17 Thread Timo Walther
Hi Dan, the exception that you get is a very frequent limitation in Flink SQL at the moment. I tried to summarize the issue recently here: https://stackoverflow.com/questions/64445207/rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join-despite-usi/64500296#64500296 The query i

Re: Flink - Create Temporary View and "Rowtime attributes must not be in the input rows of a regular join"

2020-12-17 Thread Timo Walther
Hi Dan, are you intending to use interval joins, regular joins, or a mixture of both? For regular joins you must ensure to cast a rowtime attribute to timestamp as early as possible. For interval joins, you need to make sure that the rowtime attribute is unmodified. Currently, I see COALE

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

2020-12-28 Thread Timo Walther
Hi Yuval, the legacy type has no string representation that can be used in a SQL DDL statement. The current string representation LEGACY(...) is only a temporary work around to persist the old types in catalogs. Until FLIP-136 is fully implemented, toAppendStream/toRetractStream support only

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

2020-12-28 Thread Timo Walther
id type of the table. Hope that clarifies a bit, since the pipeline is rather complex I can't really share a MVCE of it. On Mon, Dec 28, 2020 at 11:08 AM Timo Walther mailto:twal...@apache.org>> wrote: Hi Yuval, the legacy type has no string representa

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

2020-12-28 Thread Timo Walther
P-136 to resolve the issue around legacy types? Will it's implementation allow to register LEGACY types? or a new variation of them? On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <mailto:twal...@apache.org>> wrote: I would recommend to use the old UDF stack for now.

Re: Flink SQL, temporal joins and backfilling data

2021-01-05 Thread Timo Walther
Hi Dan, are you sure that your watermarks are still correct during reprocessing? As far as I know, idle state retention is not used for temporal joins. The watermark indicates when state can be removed in this case. Maybe you can give us some more details about which kind of temporal join yo

Re: UDTAGG and SQL

2021-01-05 Thread Timo Walther
Hi Marco, nesting aggregated functions is not allowed in SQL. The exception could be improved though. I guess the planner searches for a scalar function called `MyUDTAGG` in your example and cannot find one. Maybe the built-in function `COLLECT` or `LISTAGG`is what you are looking for? htt

Re: UDTAGG and SQL

2021-01-05 Thread Timo Walther
, ie, SELECT FROM (SELECT FROM)? On Jan 5, 2021, at 6:10 AM, Timo Walther wrote: Hi Marco, nesting aggregated functions is not allowed in SQL. The exception could be improved though. I guess the planner searches for a scalar function called `MyUDTAGG` in your example and cannot find one

Re: Using key.fields in 1.12

2021-01-07 Thread Timo Walther
Hi Aeden, we updated the connector property design in 1.11 [1]. The old translation layer exists for backwards compatibility and is indicated by `connector.type=kafka`. However, `connector = kafka` indicates the new property design and `key.fields` is only available there. Please check all p

Re: Using key.fields in 1.12

2021-01-07 Thread Timo Walther
tor-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160 Does format.avro-schema need to be specified differently? Thank you, Aeden On Thu, Jan 7, 2021 at 12:15 AM Timo Walther wrote: Hi Aeden, we updated the connector property design in 1.11 [1]

Re: Using key.fields in 1.12

2021-01-11 Thread Timo Walther
me in handy. I was looking through docs hoping there was a way to still specify the schema with no luck. Does such an option exist? On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Aeden, `format.avro-schema` is not required anymore in the n

Re: FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread Timo Walther
Hi, it seems this is a bug that is located in the Apache Calcite code. I will open an issue for it. Thanks for reporting this. Regards, Timo On 12.01.21 11:08, jy l wrote: Hi: Flink SQL filter data throw an exception, code: def main(args: Array[String]): Unit = {     val env = StreamExecut

Re: FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread Timo Walther
See here: https://issues.apache.org/jira/browse/FLINK-20942 On 12.01.21 16:04, Timo Walther wrote: Hi, it seems this is a bug that is located in the Apache Calcite code. I will open an issue for it. Thanks for reporting this. Regards, Timo On 12.01.21 11:08, jy l wrote: Hi: Flink SQL

Re: Flink 1.12 Kryo Serialization Error

2021-01-13 Thread Timo Walther
Hi Yuval, could you share a reproducible example with us? I see you are using SQL / Table API with a RAW type. I could imagine that the KryoSerializer is configured differently when serializing and when deserializing. This might be due to `ExecutionConfig` not shipped (or copied) through the

Re: How to Iterate over results in Table API version 1.12.0

2021-01-15 Thread Timo Walther
Hi Robert, could you send us the error/stacktrace that is printed? An example how it should work is shown here: https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java https://github.c

Re: How to Iterate over results in Table API version 1.12.0

2021-01-15 Thread Timo Walther
ping SlotPool. 2021-01-15 16:52:08,468 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager 0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource m

Re: How to Iterate over results in Table API version 1.12.0

2021-01-15 Thread Timo Walther
maybe Godfrey in CC knows more? On 15.01.21 18:10, Timo Walther wrote: How are you running the Flink cluster? What is your deplyment? The exception clearly indicates that you found a bug. Could you open an ticket in Flink's JIRA? We need details how to reproduce it. Thanks, Timo

Re: Why use ListView?

2021-01-18 Thread Timo Walther
Hi Rex, ListView and MapView have been part of Flink for years. However, they were considered as an internal feature and therefore not well documented. MapView is used internally to make distinct aggregates work. Because we reworked the type inference of aggregate functions, we also added ba

Re: Computed Columns In Stream to Table Conversion

2021-01-18 Thread Timo Walther
Hi Aeden, computed columns on a DataStrem input are currently not supported. I am currently working on making this possible. Have a look at FLIP-136 for more information [1]. However, you can simply add a projection before you register a view: tEnv.createTemporaryView("myTable", dataStream);

Re: Flink SQL and checkpoints and savepoints

2021-01-18 Thread Timo Walther
Hi Dan, currently, we cannot provide any savepoint guarantees between releases. Because of the nature of SQL that abstracts away runtime operators, it might be that a future execution plan will look completely different and thus we cannot map state anymore. This is not avoidable because the o

Re: How to Iterate over results in Table API version 1.12.0

2021-01-18 Thread Timo Walther
Mode). What's the link to Flink's JIRA? On Fri, Jan 15, 2021 at 12:19 PM Timo Walther <mailto:twal...@apache.org>> wrote: maybe Godfrey in CC knows more? On 15.01.21 18:10, Timo Walther wrote: > How are you running the Flink cluster? What is your deplyment?

Re: Setting different timeouts for savepoints and checkpoints

2021-01-18 Thread Timo Walther
Hi Rex, feel free to open an issue for this. I could also imagine that checkpoints and savepoints will further divert from each other and a having different timeout might be reasonable. Regards, Timo On 17.01.21 02:43, Rex Fenley wrote: Thanks for the quick response. Is this something tha

Re: Flink ID hashing

2021-01-18 Thread Timo Walther
Hi Rex, for questions like this, I would recommend to checkout the source code as well. Search for subclasses of `StreamPartitioner`. For example, for keyBy Flink uses: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitio

Re: Flink SQL and checkpoints and savepoints

2021-01-18 Thread Timo Walther
exactly once? I'm inferring what is meant by dry out.  Are there any documented patterns for it?  E.g. sending data to new kafka topics between releases? On Mon, Jan 18, 2021, 01:04 Timo Walther <mailto:twal...@apache.org>> wrote: Hi Dan, currently, we cannot provide

Re: Updating elements of a window in regular intervals

2021-01-18 Thread Timo Walther
Hi Anton, in many scenarios, it might be better just use a ProcessFunction because you might reach the limit of the built-in window functions very quickly. ProcessFunction gives you full flexibility and you can put into state what you like and set/fire timers when you think the time is appropr

Re: Flink 1.12 Kryo Serialization Error

2021-01-18 Thread Timo Walther
etween the two serializers Flink blows up at runtime saying that the types don't match. On Wed, Jan 13, 2021 at 1:19 PM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Yuval, could you share a reproducible example with us? I see you are using SQL / Table API

Re: Flink 1.12 Kryo Serialization Error

2021-01-18 Thread Timo Walther
I ported the code to the Flink code base. Because I had issues with SBT and Scala 2.12. Note it uses an older version of circe. I'm just pasting it here in case it helps someone. Regards, Timo On 18.01.21 13:51, Timo Walther wrote: Hi Yuval, thanks for sharing some code with us. I sc

Re: Flink 1.12 Kryo Serialization Error

2021-01-18 Thread Timo Walther
Forgot to add the link: https://github.com/twalthr/flink/tree/kryoBug_ser Regards, Timo On 18.01.21 14:11, Timo Walther wrote: I ported the code to the Flink code base. Because I had issues with SBT and Scala 2.12. Note it uses an older version of circe. I'm just pasting it here in ca

Re: Question about timestamp of StreamRecord

2021-01-18 Thread Timo Walther
Hi, in SQL event time is not part of the StreamRecord but a column in the table. Thus, you need to extract it and specify the column name/location when converting to Table API: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to

Re: Question about timestamp of StreamRecord

2021-01-18 Thread Timo Walther
Forget what I said before, I just tested the behavior and it seems there is a bug in the conversion logic. I opened https://issues.apache.org/jira/browse/FLINK-21013 Thanks for reaching out to us. Regards, Timo On 18.01.21 15:37, Timo Walther wrote: Hi, in SQL event time is not part of the

Re: Why use ListView?

2021-01-19 Thread Timo Walther
18.01.21 18:28, Rex Fenley wrote: Fascinating, do you have an estimate of what qualifies as a lot of data and therefore when this should be used? Thanks On Mon, Jan 18, 2021 at 12:35 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Rex, ListView and MapView have been part

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Timo Walther
Hi Dylan, I'm assuming your are using Flink 1.12 and the Blink planner? Beginning from 1.12 you can use the "new" aggregate functions with a better type inference. So TypeInformation will not be used in this stack. I tried to come up with an example that should explain the rough design. I wi

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Timo Walther
.outputTypeStrategy { callContext => val outputDataType = callContext.getArgumentDataTypes().get(0); Optional.of(outputDataType); } .build() } } Regards,

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Timo Walther
> classOf[LatestNonNull[String]]) don't make a difference. The generics will be type erased in bytecode and only the class name matters. Thanks, Timo On 21.01.21 11:36, Timo Walther wrote: Hi Dylan, thanks for the investigation. I can now also reproduce it my code. Yes, this is a bug.

Re: Trying to create a generic aggregate UDF

2021-01-21 Thread Timo Walther
://github.com/apache/flink/pull/13787 On 1/21/21, 8:50 AM, "Timo Walther" wrote: I opened a PR. Feel free to try it out. https://github.com/apache/flink/pull/14720 Btw: >> env.createTemporarySystemFunction("LatestNonNullLong", >

Re: Unknown call expression: avg(amount) when use distinct() in Flink,Thanks~!

2021-01-22 Thread Timo Walther
Hi, I'm assuming you are using Flink 1.12? The exception indicates that something is definitely going wrong with the translation from Table API to optimizer nodes. We refactored a lot of this code in this region. I investogate the issue and come back to you once I opended a ticket. Thanks f

Re: How to fix deprecation on registerTableSink

2021-01-25 Thread Timo Walther
Hi Flavio, FLIP-129 will update the connect() API with a programmatic way of defining tables. In the API we currently only support the DDL via executeSql. I would recommend to implement the Catalog interface. This interface has a lot of methods, but you only need to implement a couple of met

Re: How to fix deprecation on registerTableSink

2021-01-25 Thread Timo Walther
answer TImo! I think I'll wait for the migration to finish before updating my code. However, does the usage of a catalog solve the problem of CSV override as well? I can't find a way to use INSERT OVERRIDE with a CSV sink using the executeSql. Best, Flavio On Mon, Jan 25, 2021 at 10:

Re: Using double quotes for SQL identifiers

2021-01-26 Thread Timo Walther
Hi Gyula, the TableEnvironment.getConfig offers a setPlannerConfig. And org.apache.flink.table.planner.calcite.CalciteConfigBuilder helps in creating an object that implements this interface. You should be able to influence the Calcite parser config with this. However, I'm not sure how well

Re: HybridMemorySegment index out of bounds exception

2021-01-28 Thread Timo Walther
Hi Yuval, we should definitely find the root cause of this issue. It helps if the exception happens frequently to nail down the problem. Have you tried to replace the JSON object with a regular String? If the exception is gone after this change. I believe it must be the serialization and not

Re: HybridMemorySegment index out of bounds exception

2021-01-28 Thread Timo Walther
t directly related to a Kryo serialization of the specific underlying type (io.circe.Json), but something in the way it interacts with BinaryRawValueData and writing out to the network buffer behind the scenes. On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <mailto:twal...@apache.org>&

Re: HybridMemorySegment index out of bounds exception

2021-01-28 Thread Timo Walther
I don't see any type equality issues, and I see the same serializer being invoked for both serialization and deserialization. On Thu, Jan 28, 2021 at 5:51 PM Timo Walther <mailto:twal...@apache.org>> wrote: This is helpful information. So I guess the problem must be in th

Re: Newbie question: Machine Learning Library of Apache Flink

2021-02-01 Thread Timo Walther
Hi, it is true that there is no dedicated machine learning library for Flink. Flink is a general data processing framework. It allows to embedded any available algorithm library within user-defined functions. Flink's focus is on stream processing. There are not many dedicated stream processi

Re: Flink SQL and checkpoints and savepoints

2021-02-01 Thread Timo Walther
milar. On Thu, Jan 28, 2021 at 11:45 PM Dan Hill <mailto:quietgol...@gmail.com>> wrote:     Is this savepoint recovery issue also true with the Flink Table     API?  I'd assume so.  Just doublechecking.     On Mon, Jan 18, 2021 at 1:58 AM Timo Walther mailto:twal...@apache.org&g

Re: Unknown call expression: avg(amount) when use distinct() in Flink Thanks~!

2021-02-01 Thread Timo Walther
Hi, sorry I forgot to further investigate this issue. It seems the last refactoring of the code base caused this documented feature to break. I opened an issue for it: https://issues.apache.org/jira/browse/FLINK-21225 For now, I would suggest to use SQL for the same behavior. I hope someone

Re: LEAD/LAG functions

2021-02-01 Thread Timo Walther
Hi Patrick, I could imagine that LEAD/LAG are translated into RANK/ROW_NUMBER operations that are not supported in this context. But I will loop in @Jark who might know more about the limitaitons here. Regards, Timo On 29.01.21 17:37, Patrick Angeles wrote: Another (hopefully newbie) questi

Re: Is Flink able to parse strings into dynamic JSON?

2021-02-01 Thread Timo Walther
Hi Devin, Flink supports arbitrary data types. You can simply read the JSON object as a big string first and process the individual event types in a UDF using e.g. the Jackson library. Are you using SQL or DataStream API? An alternative is to set the "fail-on-missing-field" flag to false. Th

Re: Problem restirng state

2021-02-01 Thread Timo Walther
Hi Shridhar, the exception indicates that something is wrong with the object serialization. Kryo is unable to serialize the given object. It might help to 1) register a custom Kryo serializer in the ExecutionConfig or 2 ) pass dedicated type information using the types from org.apache.flink

Re: Rocksdb - org.apache.flink.util.SerializedThrowable : bad entry in block

2021-02-01 Thread Timo Walther
Hi Omkar, sorry for the late reply. This sounds like a serious issue. It looks like some of the RocksDB data is corrupt. Are you sure this is not a problem of you storage layer? Otherwise I would investigate whether the serializers work correctly. Maybe Beam did put a corrupt data into Flink

Re: Flink sql problem

2021-02-01 Thread Timo Walther
Hi Jiazhi, I think an OVER window might solve your use case. It gives you a rolling aggregation over period of time. Maybe you need to define a custom aggregate function to emit the final record as you need it. Let me know if you have further questions. Regards, Timo On 27.01.21 15:02, ?g?

Re: Proctime consistency

2021-02-01 Thread Timo Walther
Hi Rex, processing-time gives you no alignment of operators across nodes. Each operation works with its local machine clock that might be interrupted by the OS, Java garbage collector, etc. It is always a best effort timing. Regards, Timo On 27.01.21 18:16, Rex Fenley wrote: Hello, I'm lo

Re: [Flink SQL] CompilerFactory cannot be cast error when executing statement

2021-02-02 Thread Timo Walther
Hi Sebastian, sorry for the late reply. Could you solve the problem in the meantime? It definitely looks like a dependency conflict. Regards, Timo On 22.01.21 18:18, Sebastián Magrí wrote: Thanks a lot Matthias! In the meantime I'm trying out something with the scala quickstart. On Fri,

Re: Proctime consistency

2021-02-02 Thread Timo Walther
pointed with its previous proctime, then it may be misaligned in the next run and drop rows that were in that window. On Mon, Feb 1, 2021 at 6:37 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Rex, processing-time gives you no alignment of operators across nodes. Each

Re: Max with retract aggregate function does not support type: ''CHAR''.

2021-02-03 Thread Timo Walther
Hi Yuval, yes this is rather a bug. If we support VARCHAR here we should also support CHAR. Feel free to open an issue. Regards, Timo On 03.02.21 11:46, Yuval Itzchakov wrote: I can understand that in some sense it's nonsensical to MAX on a CHAR, since Blink will only determine a CHAR when t

Re: DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

2021-02-03 Thread Timo Walther
Hi Yuval, we changed this behavior a bit to be more SQL compliant. Currently, sinks must be explicitly defined with a PRIMARY KEY constraint. We had discussions about implicit sinks, but nothing on the roadmap yet. The `CREATE TEMPORARY TABLE LIKE` clause should make it easy to extend the ori

Re: DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

2021-02-04 Thread Timo Walther
, Timo On 03.02.21 16:24, Yuval Itzchakov wrote: Hi Timo, The problem with this is I would still have to determine the keys manually, which is not really feasible in my case. Is there any internal API that might be of use to extract this information? On Wed, Feb 3, 2021 at 5:19 PM Timo Walther

Re: Question about Scala Case Class and List in Flink

2021-02-05 Thread Timo Walther
Hi Xavier, the Scala API has special implicits in method such as `DataStream.map()` or `DataStream.keyBy()` to support Scala specifics like case classe. For Scala one needs to use the macro `createTypeInformation[CaseClass]` for Java we use reflection via `TypeInformation.of()`. But Scala and

Re: Question about Scala Case Class and List in Flink

2021-02-05 Thread Timo Walther
,     Thank you for ur clarification, it is very useful to me, I am also combining the realization of map function, trying to do implicit conversion of case class, so that I can restore state from FS. On Fri, Feb 5, 2021 at 10:38 PM Timo Walther <mailto:twal...@apache.org>> wrote:

Re: Table Cache Problem

2021-02-08 Thread Timo Walther
Hi Yongsong, in newer Flink versions we introduced the concept of statament sets, which are available via `TableEnvironment.createStatementSet()`. They allow you to opimized a branching pipeline as a whole with reusing subplans. In older Flink versions, you can convert the Table to a DataStre

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread Timo Walther
Hi, could the problem be that you are mixing OVER and TUMBLE window with each other? The TUMBLE is correctly defined over time attribute `row_ts` but the OVER window is defined using a regular column `upd_ts`. This might be the case why the query is not append-only but updating. Maybe you ca

Re: Joining and Grouping Flink Tables with Java API

2021-02-11 Thread Timo Walther
Hi Abdelilah, at a first glance your logic seems to be correct. But Arvid is right that your pipeline might not have the optimal performance that Flink can offer due to the 3 groupBy operations. I'm wondering what the optimizer produces out of this plan. Maybe you can share it with us using `

Re: Proctime consistency

2021-02-11 Thread Timo Walther
that will matter for our case, we just want to make sure we don't lose any data or have any gaps between windows. Please confirm if I got this right, and thank you much for your reply! On Tue, Feb 2, 2021 at 3:17 AM Timo Walther <mailto:twal...@apache.org>> wrote: As far as I

Re: Optimizing Flink joins

2021-02-11 Thread Timo Walther
Hi Dan, the order of all joins depends on the order in the SQL query by default. You can also check the following example (not interval joins though) and swap e.g. b and c: env.createTemporaryView("a", env.fromValues(1, 2, 3)); env.createTemporaryView("b", env.fromValues(4, 5, 6)); env.create

Re: Joining and Grouping Flink Tables with Java API

2021-02-11 Thread Timo Walther
After thinking about this topic again, I think UNION ALL will not solve the problem because you would need to group by brandId and perform the joining within the aggregate function which could also be quite expensive. Regards, Timo On 11.02.21 17:16, Timo Walther wrote: Hi Abdelilah, at a

Re: Optimizing Flink joins

2021-02-12 Thread Timo Walther
QL to DataStream. On Thu, Feb 11, 2021 at 9:11 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Dan, the order of all joins depends on the order in the SQL query by default. You can also check the following example (not interval joins though) and swa

  1   2   3   4   5   6   7   >