Re: Joining and Grouping Flink Tables with Java API

2021-02-15 Thread Timo Walther
sunderstood the relation between the two streams. The ideal pattern would be a broadcast join imho. [1] I'm not sure how to do it in Table API/SQL though, but I hope Timo can help here as well. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/br

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Timo Walther
Hi Kezhu, `InputSelectable` is currently not exposed in the DataStream API because it might have side effects that need to be considered (e.g. are checkpoints still go through?). In any case, we don't have a good story for blocking a control stream yet. The best option is to buffer the other

Re: Using INTERVAL parameters for UDTF

2021-02-18 Thread Timo Walther
Hi Patrick, thanks for reaching out to us and investigating the problem. Could you open an issue in the Calcite project? I think it would be nice to solve it on both the Calcite and Flink side. Thanks, Timo On 18.02.21 06:02, Patrick Angeles wrote: NVM. Found the actual source on Calcite tr

Re: Compile time checking of SQL

2021-02-18 Thread Timo Walther
Hi Sebastián, what do you consider as compile time? If you mean some kind of SQL editor, you could take a look at Ververica platform (the community edition is free): https://www.ververica.com/blog/data-pipelines-with-flink-sql-on-ververica-platform Otherwise Flink SQL is always validated at

Re: [Flink SQL] FLOOR(timestamp TO WEEK) not working

2021-02-18 Thread Timo Walther
Hi Sebastián, which Flink version are you using? And which precision do the timestamps have? This looks clearly like a bug to me. We should open an issue in JIRA. Regards, Timo On 18.02.21 16:17, Sebastián Magrí wrote: While using said function in a query I'm getting a query compilation err

Re: How is proctime represented?

2021-02-19 Thread Timo Walther
Chesnay is right. The PROCTIME() is lazy evaluated and executed when its result is needed as an argument for another expression or function. So within the pipeline the column is NULL but when you want to compute something e.g. CAST(proctime AS TIMESTAMP(3)) it will be materialized into the row.

Re: Compile time checking of SQL

2021-02-22 Thread Timo Walther
dated when I do `mvn compile` or any target that runs that so that basic syntax checking is performed without having to submit the job to the cluster. On Thu, 18 Feb 2021 at 16:17, Timo Walther <mailto:twal...@apache.org>> wrote: Hi Sebastián, what do you consider as compil

Re: org.codehaus.janino.CompilerFactory cannot be cast ....

2021-02-26 Thread Timo Walther
If this problems affects multiple people, feel free to open an issue that explains how to easily reproduce the problem. This helps us or contributors to provide a fix. Regards, Timo On 26.02.21 05:08, sofya wrote: What was the actual solution? Did you have to modify pom? -- Sent from: htt

Re: org.codehaus.janino.CompilerFactory cannot be cast ....

2021-02-26 Thread Timo Walther
Until we have more information, maybe this is also helpful: https://ci.apache.org/projects/flink/flink-docs-stable/ops/debugging/debugging_classloading.html#inverted-class-loading-and-classloader-resolution-order On 26.02.21 09:20, Timo Walther wrote: If this problems affects multiple people

Re: Handling Data Separation / Watermarking from Kafka in Flink

2021-02-26 Thread Timo Walther
Hi Rion, I think what David was refering to is that you do the entire time handling yourself in process function. That means not using the `context.timerService()` or `onTimer()` that Flink provides but calling your own logic based on the timestamps that enter your process function and the st

Re: Best way to implemented non-windowed join

2021-02-26 Thread Timo Walther
Hi Yaroslav, I think your approach is correct. Union is perfect to implement multiway joins if you normalize the type of all streams before. It can simply be a composite type with the key and a member variable for each stream where only one of those variables is not null. A keyed process funct

Re: Is it possible to specify max process memory in flink 1.8.2, similar to what is possible in flink 1.11

2021-02-26 Thread Timo Walther
Hi Barisa, by looking at the 1.8 documentation [1] it was possible to configure the off heap memory as well. Also other memory options were already present. So I don't think that you need an upgrade to 1.11 immediately. Please let us know if you could fix your problem, otherwise we can try to

Re: BackPressure in RowTime Task of FlinkSql Job

2021-02-26 Thread Timo Walther
Hi Aeden, the rowtime task is actually just a simple map function that extracts the event-time timestamp into a field of the row for the next operator. It should not be the problem. Can you share a screenshot of your pipeline? What is your watermarking strategy? Is it possible that you are ge

Re: How to pass PROCTIME through an aggregate

2021-02-26 Thread Timo Walther
Hi Rex, as far as I know, we recently allowed PROCTIME() also at arbitrary locations in the query. So you don't have to pass it through the aggregate but you can call it afterwards again. Does that work in your use case? Something like: SELECT i, COUNT(*) FROM customers GROUP BY i, TUMBLE(PR

Re: [Flink-SQL] FLOORing OR CEILing a DATE or TIMESTAMP to WEEK uses Thursdays as week start

2021-03-02 Thread Timo Walther
Hi Sebastián, it might be the case that some time functions are not correct due to the underlying refactoring of data structures. I will loop in Leonard in CC that currently works on improving this situation as part of FLIP-162 [1]. @Leonard: Is this wrong behavior on your list? Regards, Tim

Re: How to emit after a merge?

2021-03-04 Thread Timo Walther
Hi Yik, if I understand you correctly you would like to avoid the deletions in your stream? You could filter the deletions manually in DataStream API before writing them to Kafka. Semantically the deletions are required to produce a correct result because the runtime is not aware of a key fo

Re: How to emit after a merge?

2021-03-04 Thread Timo Walther
or this use case? Thank you. On Thu, Mar 4, 2021 at 4:41 PM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Yik, if I understand you correctly you would like to avoid the deletions in your stream? You could filter the deletions manually in DataStream API before

Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-03-04 Thread Timo Walther
's better for us to be more focused on a single planner. Your proposed roadmap looks good to me, +1 from my side and thanks again for all your efforts! Best, Kurt On Thu, Feb 25, 2021 at 5:01 PM Timo Walther wrote: Hi everyone, since Flink 1.9 we have supported two SQL planners. Most of

Re: How to emit after a merge?

2021-03-05 Thread Timo Walther
Chan wrote: Hi Timo, If I understand correctly, the UDF only simplifies the query, but not doing anything functionally different. Please correct me if I am wrong, thank you! Best, Yik San On Thu, Mar 4, 2021 at 8:34 PM Timo Walther <mailto:twal...@apache.org>> wrote: Yes, impl

Re: Need information on latency metrics

2021-03-05 Thread Timo Walther
Hi Suchithra, did you see this section in the docs? https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#latency-tracking Regards, Timo On 05.03.21 15:31, V N, Suchithra (Nokia - IN/Bangalore) wrote: Hi, I am using flink 1.12.1 version and trying to explore latency metrics

Re: Convert BIGINT to TIMESTAMP in pyflink when using datastream api

2021-03-05 Thread Timo Walther
Hi Shilpa, Shuiqiang is right. Currently, we recommend to use SQL DDL until the connect API is updated. See here: https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/create/#create-table Especially the WATERMARK section shows how to declare a rowtime attribute. Regards,

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-05 Thread Timo Walther
Hi Yuval, sorry that nobody replied earlier. Somehow your email fell through the cracks. If I understand you correctly, could would like to implement a table source that implements both `SupportsWatermarkPushDown` and `SupportsFilterPushDown`? The current behavior might be on purpose. Filt

Re: Handle late message with flink SQL

2021-03-16 Thread Timo Walther
Hi, your explanation makes sense but I'm wondering how the implementation would look like. This would mean bigger changes in a Flink fork, right? Late data handling in SQL is a frequently asked question. Currently, we don't have a good way of supporting it. Usually, we recommend to use DataS

Re: Editing job graph at runtime

2021-03-16 Thread Timo Walther
Hi Jessy, to be precise, the JobGraph is not used at runtime. It is translated into an ExecutionGraph. But nevertheless such patterns are possible but require a bit of manual implementation. Option 1) You stop the job with a savepoint and restart the application with slightly different par

Re: Find many strange measurements in metrics database of influxdb

2021-03-16 Thread Timo Walther
Hi Tim, "from table1" might be the operator that reads "table1" also known as the table scan operator. Could you share more of the metrics and their values? Most of them should be explained in https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#system-metrics Regards, Ti

Re: Time Temporal Join

2021-03-16 Thread Timo Walther
Hi Satyam, first of all your initial join query can also work, you just need to make sure that no time attribute is in the SELECT clause. As the exception indicates, you need to cast all time attributes to TIMESTAMP. The reason for this is some major design issue that is also explained here w

Re: [Flink SQL] Leniency of JSON parsing

2021-03-16 Thread Timo Walther
2021 18:50 *To:* Timo Walther ; ro...@apache.org *Cc:* user *Subject:* Re: [Flink SQL] Leniency of JSON parsing Hi Roman! Seems like that option is no longer available. Best Regards, Sebastian *From:* Roman Khachatryan

Re: OrcTableSource in flink 1.12

2021-03-22 Thread Timo Walther
Hi Nikola, the OrcTableSource has not been updated to be used in a SQL DDL. You can define your own table factory [1] that translates properties into a object to create instances or use `org.apache.flink.table.api.TableEnvironment#fromTableSource`. I recommend the latter option. Please kee

Re: OrcTableSource in flink 1.12

2021-03-23 Thread Timo Walther
re? I would like to read from orc files, run a query and transform the result. I do not necessarily need it to be with the DataSet API. Regards , Nikola On Mon, Mar 22, 2021 at 6:49 PM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Nikola, the OrcTableSource has not bee

Re: Flink 1.12.2 sql api use parquet format error

2021-04-08 Thread Timo Walther
Hi, can you check the content of the JAR file that you are submitting? There should be a `META-INF/services` directory with a `org.apache.flink.table.factories.Factory` file that should list the Parque format. See also here: https://ci.apache.org/projects/flink/flink-docs-master/docs/connec

Re: Compression with rocksdb backed state

2021-04-08 Thread Timo Walther
Hi Deepthi, 1. Correct 2. Correct 3. Incremental snapshots simply manage references to RocksDB's sstables. You can find a full explanation here [1]. Thus, the payload is a blackbox for Flink and Flink's compression flag has no impact. So we fully rely what RocksDB offers. 4. Correct I hope t

Re: SingleValueAggFunction received more than one element error with LISTAGG

2021-04-08 Thread Timo Walther
Hi, which Flink version are you using? Could you also share the resulting plan with us using `TableEnvironment.explainSql()`? Thanks, Timo On 07.04.21 17:29, soumoks123 wrote: I receive the following error when trying to use the LISTAGG function in Table API. java.lang.RuntimeException:

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Timo Walther
Hi Dylan, streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); is currently not supported by the Table & SQL API. For now, val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() determines the mode. Thus, I would remove the line again. If you want to u

Re: Udf Performance and Object Creation

2015-08-12 Thread Timo Walther
Hello Michael, every time you code a Java program you should avoid object creation if you want an efficient program, because every created object needs to be garbage collected later (which slows down your program performance). You can have small Pojos, just try to avoid the call "new" in your

Re: Java 8 and type erasure

2015-08-28 Thread Timo Walther
;> wrote: >> >> Exactly, Timo opened the thread. >> >> On Tue, Aug 18, 2015 at 2:04 PM, Kristoffer Sjögren mailto:sto...@gmail.com>> >> wrote: >>> >>> Yeah, I think I found the thread already... by Timo Wal

Re: Powered by Flink

2015-10-19 Thread Timo Walther
+1 for adding it to the website instead of wiki. "Who is using Flink?" is always a question difficult to answer to interested users. On 19.10.2015 15:08, Suneel Marthi wrote: +1 to this. On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske > wrote: Sounds good +1

Re: Powered by Flink

2015-10-19 Thread Timo Walther
Ah ok, sorry. I think linking to the wiki is also ok. On 19.10.2015 15:18, Fabian Hueske wrote: @Timo: The proposal was to keep the list in the wiki (can be easily extended) but link from the main website to the wiki page. 2015-10-19 15:16 GMT+02:00 Timo Walther : +1 for adding it to the

Re: ype of TypeVariable could not be determined

2016-03-08 Thread Timo Walther
Hi Radu, the exception can have multiple causes. It would be great if you could share some example code. In most cases the problem is the following: public class MapFunction { } new MapFunction(); The type WhatEverType is type erasured by Java. The type must not be declared in the "new"

Re: ype of TypeVariable could not be determined

2016-03-09 Thread Timo Walther
I think your problem is that you declared "TupleEvent2" as a TypeVariable in your code but I think you want to use a class that you defined, right? If so this is the correct declaration: MySourceFunction implements SourceFunction On 09.03.2016 09:28, Wang Yangjun wrote: Hello, I think in th

Re: Converting String/boxed-primitive Array columns back to DataStream

2020-04-28 Thread Timo Walther
Hi Gyula, are you coming from DataStream API or are you trying to implement a source/sink? It looks like the array is currently serialized with Kryo. I would recommend to take a look at this class: org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter This is the current mapping

Re: "Fill in" notification messages based on event time watermark

2020-04-28 Thread Timo Walther
Hi Manas, Reg. 1: I would recommend to use a debugger in your IDE and check which watermarks are travelling through your operators. Reg. 2: All event-time operations are only performed once the watermark arrived from all parallel instances. So roughly speaking, in machine time you can assume

Re: Converting String/boxed-primitive Array columns back to DataStream

2020-04-28 Thread Timo Walther
Timo, I am trying to convert simply back to a DataStream. Let's say: DataStream> I can convert the DataStream into a table without a problem, the problem is getting a DataStream back. Thanks Gyula On Tue, Apr 28, 2020 at 6:32 PM Timo Walther <mailto:twal...@apache.org>> wrot

Re: Publishing Sink Task watermarks outside flink

2020-04-28 Thread Timo Walther
Hi Shubham, you can call stream.process(...). The context of ProcessFunction gives you access to TimerService which let's you access the current watermark. I'm assuming your are using the Table API? As far as I remember, watermark are travelling through the stream even if there is no time-ba

Re: ML/DL via Flink

2020-04-28 Thread Timo Walther
Hi Max, as far as I know a better ML story for Flink is in the making. I will loop in Becket in CC who may give you more information. Regards, Timo On 28.04.20 07:20, m@xi wrote: Hello Flinkers, I am building a *streaming* prototype system on top of Flink and I want ideally to enable ML tra

Re: Publishing Sink Task watermarks outside flink

2020-05-18 Thread Timo Walther
of the box, is it possible to add some extra operator after sink which will always have watermark which is greater than sink function watermarks, as its a downstream operator. Also, does the problem simplify if we have Kafka sink? On Tue, Apr 28, 2020 at 10:35 PM Timo Walther mail

Re: Infer if a Table will create an AppendStream / RetractStream

2020-05-18 Thread Timo Walther
Hi Yuval, currently there is no API for getting those insights. I guess you need to use internal API for getting this information. Which planner and version are you using? Regards, Timo On 18.05.20 14:16, Yuval Itzchakov wrote: Hi, Is there any way to infer if a Table is going to generate

Re: Properly using ConnectorDescriptor instead of registerTableSource

2020-05-18 Thread Timo Walther
Hi Nikola, the reason for deprecating `registerTableSource` is that we aim to have everything declarative in Table API. A table program should simply declare what it needs and the planner should find a suitable connector, regardless how the underlying class structure looks like. This might al

Re: Writing to SQL server

2020-05-21 Thread Timo Walther
Hi Martin, usually, this error occurs when people forget to add `org.apache.flink.api.scala._` to their imports. It is triggered by the Scala macro that the DataStream API uses for extracting types. Can you try to call `result.toAppendStream[Row]` directly? This should work if you import `or

Re: Incremental state

2020-06-10 Thread Timo Walther
Hi Annemarie, if TTL is what you are looking for and queryable state is what limits you, it might make sense to come up with a custom implementation of queryable state? TTL might be more difficult to implement. As far as I know this feature is more of an experimental feature without any consi

Re: Reading from AVRO files

2020-06-10 Thread Timo Walther
Hi Lorenzo, as far as I know we don't support Avro's logical times in Flink's AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports the 1.8.2 version of logical types but might be incompatible with 1.9.2. Reg 2) Specific record generated with AVRO 1.9.2 plugin: Could you send u

Re: Error reporting for Flink jobs

2020-06-29 Thread Timo Walther
Hi Satyam, I'm not aware of an API to solve all your problems at once. A common pattern for failures in user-code is to catch errors in user-code and define a side output for an operator to pipe the errors to dedicated sinks. However, such a functionality does not exist in SQL yet. For the si

Re: Convert sql table with field of type MULITSET to datastream with field of type java.util.Map[T, java.lang.Integer]

2020-06-29 Thread Timo Walther
Hi YI, not all conversion might be supported in the `toRetractStream` method. Unfortunately, the rework of the type system is still in progress. I hope we can improve the user experience there quite soon. Have you tried to use `Row` instead? `toRetractStream[Row]` should work for all data ty

Re: Byte arrays in Avro

2020-07-16 Thread Timo Walther
Hi Lasse, are you using Avro specific records? A look into the code shows that the warnings in the log are generated after the Avro check: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java#L1741 Somehow your Avro object

Re: Byte arrays in Avro

2020-07-16 Thread Timo Walther
definitely the `AvroSerializer` if the type information is `AvroTypeInfo`. You can check that via `dataStream.getType`. I hope this helps. Regards, Timo On 16.07.20 14:28, Timo Walther wrote: Hi Lasse, are you using Avro specific records? A look into the code shows that the warnings in the

Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed

2020-07-24 Thread Timo Walther
Hi Dmytro, `StreamTableEnvironment` does not support batch mode currently. Only `TableEnvironment` supports the unified story. I saw that you disabled the check in the `create()` method. This check exists for a reason. For batch execution, the planner sets specific properties on the stream g

Re: Kafka connector with PyFlink

2020-07-27 Thread Timo Walther
Hi, the InvalidClassException indicates that you are using different versions of the same class. Are you sure you are using the same Flink minor version (including the Scala suffix) for all dependencies and Kubernetes? Regards, Timo On 27.07.20 09:51, Wojciech Korczyński wrote: Hi, when

Re: Unable to deduce RocksDB api calls in streaming.

2020-07-27 Thread Timo Walther
Hi Aviral, as far as I know we are not calling RocksDB API to perform snapshots. As the Stackoverflow answer also indicates most of the snapshotting is done outside of RocksDB by just dealing with the SST files. Have you checked the available metrics in the web UI? https://ci.apache.org/proj

Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed

2020-07-27 Thread Timo Walther
them, but I`m wonder if there is a better way to do it. It sounds a quite strange that with having Blink planner which optimise DataStream pipelines for stream and batch jobs, there is necessity to write the same things on DataStream and DataSet API. On 24/07/2020, 15:36, "Timo

Re: Kafka connector with PyFlink

2020-07-27 Thread Timo Walther
you check that? @Timo I think we should define the serialVersionUID for all the classes which implements Serializable. What do you think? Regards, Dian 在 2020年7月27日,下午4:38,Timo Walther <mailto:twal...@apache.org>> 写道: Hi, the InvalidClassException indicates that you are using

Re: problem with build from source flink 1.11

2020-07-27 Thread Timo Walther
Hi Felipe, are you sure that Maven and the TaskManagers are using the JDK version that you mentioned? Usually, a `mvn clean install` in the `.../flink/` directory should succeed without any problems. Also your Maven version seems pretty old. I'm using Apache Maven 3.6.3 for example. The No

Re: Flink 1.11.0 UDAF fails with "No match found for function signature fun()"

2020-07-27 Thread Timo Walther
Hi Dmytro, aggregate functions will support the new type system in Flink 1.12. Until then, they cannot be used with the new `call()` syntax as anonymous functions. In order to use the old type system, you need to register the function explicilty using SQL `CREATE FUNCTION a AS 'myFunc'` and t

Re:  problem with build from source flink 1.11

2020-07-27 Thread Timo Walther
recommended Maven version for building Flink. @Felipe Can you provide us the full stacktrace? This could be a library issue in regards to JDK compatibility. On 27/07/2020 15:23, Timo Walther wrote: Hi Felipe, are you sure that Maven and the TaskManagers are using the JDK version that you ment

Re: Flink 1.11.0 UDAF fails with "No match found for function signature fun()"

2020-07-29 Thread Timo Walther
s about TableFunction, so maybe it is something different, but related. I have created a small github project with both cases: https://github.com/dmytroDragan/flink-1.11-sql-agg-issue/blob/master/src/test/java/lets/test/flink/AggFunTest.java I would appreciate if you could take a look. On 27/07/2

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Timo Walther
Hi Dongwon, another possibility is to use DataStream API before. There you can extract the metadata and use DataStream.assignTimestampsAndWatermarks before converting the stream to a table. Regards, Timo On 11.08.20 09:41, Dongwon Kim wrote: Hi Dawid, I'll try your suggestion [2] and wait

Re: Event time based disconnection detection logic

2020-08-11 Thread Timo Walther
Hi Manas, at the first glance your code looks correct to me. I would investigate if your keys and watermarks are correct. Esp. the watermark frequency could be an issue. If watermarks are generated at the same time as the heartbeats itself, it might be the case that the timers fire first befo

Re: Batch version of StreamingFileSink.forRowFormat(...)

2020-08-11 Thread Timo Walther
Hi Dan, InputFormats are the connectors of the DataSet API. Yes, you can use either readFile, readCsvFile, readFileOfPrimitives etc. However, I would recommend to also give Table API a try. The unified TableEnvironment is able to perform batch processing and is integrated with a bunch of conn

Re: Proper way to do Integration Testing ?

2020-08-11 Thread Timo Walther
Hi Faye, Flink does not officially provide testing tools at the moment. However, you can use internal Flink tools if they solve your problem. The `flink-end-to-end-tests` module [1] shows some examples how we test Flink together with other systems. Many tests are still using plain bash scrip

Re: Event time based disconnection detection logic

2020-08-11 Thread Timo Walther
is produces the expected output. Also, I will assume that this is the best way to solve my problem - I can't use Flink's session windows. Let me know if anyone has any other ideas though! Thank you for your time and quick response! On Tue, Aug 11, 2020 at 1:45 PM Timo Walther <ma

Re: how to add a new runtime operator

2020-08-12 Thread Timo Walther
Hi Vincent, we don't have a step by step guide for adding new operators. Most of the important operations are exposed via DataStream API. Esp. ProcessFunction [1] fits for most complex use cases with access to the primitives such as time and state. What kind of operator is missing for your u

Re: Using Event Timestamp sink get's back with machine timezone

2020-08-12 Thread Timo Walther
Hi Faye, the problem lies in the wrong design of JDK's java.sql.Timestamp. You can also find a nice summary in the answer here [1]. java.sql.Timestamp is timezone dependent. Internally, we subtract/normalize the timezone and work with the UNIX timestamp. Beginning from Flink 1.9 we are using

Re: Is there a way to start a timer without ever receiving an event?

2020-08-13 Thread Timo Walther
What you can do is creating an initial control stream e.g. using `StreamExecutionEnivronment.fromElements()` and either use `union(controlStream, actualStream)` or use `actualStream.connect(controlStream)`. Regards, Timo On 12.08.20 18:15, Andrey Zagrebin wrote: I do not think so. Each timer

Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

2020-08-13 Thread Timo Walther
Hi Lu, `env.execute("table api");` is not necessary after FLIP-84 [1]. Every method that has `execute` in its name will immediately execute a job. Therefore your `env.execute` has an empty pipeline. Regards, Timo [1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

Re: How to write a customer sink partitioner when using flinksql kafka-connector

2020-08-18 Thread Timo Walther
Hi Lei, you can check how the FlinkFixedPartitioner [1] or Tuple2FlinkPartitioner [2] are implemented. Since you are using SQL connectors of the newest generation, you should receive an instance of org.apache.flink.table.data.RowData in your partitioner. You can create a Maven project with a

Re: Flink SQL UDAF com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID

2020-08-18 Thread Timo Walther
Hi Forideal, luckily these problems will belong to the past in Flink 1.12 when UDAF are updated to the new type system [1]. Lists will be natively supported and registering custom KryoSerializers consistently as well. Until then, another workaround is to override getAccumulatorType() and def

Re: UT/IT Table API/SQL streaming job

2020-08-19 Thread Timo Walther
Hi, this might be helpful as well: https://lists.apache.org/thread.html/rfe3b45a10fc58cf19d2f71c6841515eb7175ba731d5055b06f236b3f%40%3Cuser.flink.apache.org%3E First of all, it is important to know if you are interested in end-to-end tests (incl. connectors) or excluding connectors. If you jus

Re: Editing Rowtime for SQL Table

2020-09-02 Thread Timo Walther
m On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <mailto:t...@ververica.com>> wrote: Hi Satyam, Matthias is right. A rowtime attribute cannot be modified and needs to be passed "as is" through the pipeline. The only exceptions are if a newer rowtime is offered

Re:

2020-09-08 Thread Timo Walther
Hi Violeta, can you share your connector code with us? The plan looks quite complicated given the relatively simple query. Maybe there is some optimization potential. But before we dive deeper, I see a `Map(to: Row)` which indicates that we might work with a legacy sink connector. Did you tr

Re: FLINK DATASTREAM Processing Question

2020-09-08 Thread Timo Walther
Hi Vijay, one comment to add is that the performance might suffer with multiple map() calls. For safety reason, records between chained operators are serialized and deserialized in order to strictly don't influence each other. If all functions of a pipeline are guaranteed to not modify incomi

Re: Flink alert after database lookUp

2020-09-08 Thread Timo Walther
Hi Sunitha, what you are describing is a typical streaming enrichment. We need to enrich the stream with some data from a database. There are different strategies to handle this: 1) You are querying the database for every record. This is usually not what you want because it would slow down y

Re:

2020-09-08 Thread Timo Walther
You are using the old connectors. The new connectors are available via SQL DDL (and execute_sql() API) like documented here: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html Maybe this will give your some performance boost, but certainly not eno

Re: Flink alert after database lookUp

2020-09-09 Thread Timo Walther
ossible sample source code for reference to stream database. Please help me badly stuck. In the mail, I see you asked me to register. Are you referring to any training here or any other registration. Regards, Sunitha. On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo Walther wrote

Re: Slow Performance inquiry

2020-09-09 Thread Timo Walther
Hi Hazem, I guess your performance is mostly driven by the serialization overhead in this case. How do you declare your state type? Flink comes with different serializers. Not all of them are extracted automatically when using reflective extraction methods: - Note that `Serializable` decla

Re: Slow Performance inquiry

2020-09-10 Thread Timo Walther
r way? Regards, Heidy ---- *From:* Timo Walther mailto:twal...@apache.org>> *Sent:* Wednesday, September 9, 2020 1:58 PM *To:* user@flink.apache.org <mailto:user@flink.apache.org>

Re:

2020-09-10 Thread Timo Walther
Hi Violeta, I just noticed that the plan might be generated from Flink's old planner instead of the new, more performant Blink planner. Which planner are you currently using? Regards, Timo On 08.09.20 17:51, Timo Walther wrote: You are using the old connectors. The new connector

Re: Backquote in SQL dialect

2020-09-17 Thread Timo Walther
Hi Satyam, this has historical reasons. In the beginning all SQL queries were embedded in Java programs and thus Java strings. So single quote was handy for declaring SQL strings in a Java string and backticks for escaping keywords. But I agree that we should make this configurable. Feel free

Re: Watermark advancement in late side output

2020-09-21 Thread Timo Walther
Hi Ori, first of all, watermarks are sent to all side outputs (this is tested here [1]). Thus, operators in the side output branch of the pipeline will work similar to operators in the main branch. When calling `assignTimestampsAndWatermarks`, the inserted operator will erase incoming waterm

Re: Problem with zookeeper and flink config

2020-09-21 Thread Timo Walther
Hi Saksham, if I understand you correctly, you are running Zookeeper and Flink locally on your machine? Are you using Docker or is this a bare metal setup? The exception indicates that your paths contain `hdfs:` as URL scheme. Are you sure you want to use HDFS? If yes, you need to add an HDFS

Re: How to disconnect taskmanager via rest api?

2020-09-21 Thread Timo Walther
Hi Luan, this sound more of a new feature request to me. Maybe you can already open an issue for it. I will loop in Chesnay in CC if there is some possibility to achieve this already? Regards, Timo On 21.09.20 06:37, Luan Cooper wrote: Hi We're running flink standalone cluster on k8s whe

Re: Zookeeper connection loss causing checkpoint corruption

2020-09-21 Thread Timo Walther
Hi Arpith, is there a JIRA ticket for this issue already? If not, it would be great if you can report it. This sounds like a critical priority issue to me. Thanks, Timo On 22.09.20 06:25, Arpith P wrote: Hi Peter, I have recently had a similar issue where I could not load from the checkpoi

Re: hourly counter

2020-09-22 Thread Timo Walther
Hi Lian, you are right that timers are not available in a ProcessWindowFunction but the state store can be accessed. So given that your window width is 1 min, you could maintain an additional state value for counting the minutes and updating your counter once this value reached 60. Otherwise

Re: Is there a way to avoid submit hive-udf's resources when we submit a job?

2020-09-22 Thread Timo Walther
Hi Husky, I guess https://issues.apache.org/jira/browse/FLINK-14055 is what is needed to make this feature possible. @Rui: Do you know more about this issue and current limitations. Regards, Timo On 18.09.20 09:11, Husky Zeng wrote: When we submit a job which use udf of hive , the job will

Re: RichFunctions in Flink's Table / SQL API

2020-09-23 Thread Timo Walther
Hi Piyush, unfortunately, UDFs have no direct access to Flink's state. Aggregate functions are the only type of functions that can be stateful at the moment. Aggregate functions store their state in an accumulator that is serialized/deserialized on access, but an accumulator field can be back

Re: Back pressure with multiple joins

2020-09-25 Thread Timo Walther
Hi Dan, could you share the plan with us using `TableEnvironment.explainSql()` for both queries? In general, views should not have an impact on the performance. They are a logical concept that gives a bunch of operations a name. The contained operations are inlined into the bigger query duri

Re: Flink SQL - can I force the planner to reuse a temporary view to be reused across multiple queries that access different fields?

2020-09-28 Thread Timo Walther
Hi Dan, unfortunetely, it is very difficult to read you plan? Maybe you can share a higher resolution and highlight which part of the pipeline is A, B etc. In general, the planner should be smart enough to reuse subplans where appropriate. Maybe this is a bug or shortcoming in the optimizer r

Re: Flink Batch Processing

2020-09-29 Thread Timo Walther
Hi Sunitha, currently, not every connector can be mixed with every API. I agree that it is confusing from time to time. The HBase connector is an InputFormat. DataSet, DataStream and Table API can work with InputFormats. The current Hbase input format might work best with Table API. If you li

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-05 Thread Timo Walther
es, it should also work for ingestion time. I am not entirely sure whether event time is preserved when converting a Table into a retract stream. It should be possible and if it is not working, then I guess it is a missing feature. But I am sure that @Timo Walther

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-05 Thread Timo Walther
Btw which planner are you using? Regards, Timo On 05.10.20 10:23, Timo Walther wrote: Hi Austin, could you share some details of your SQL query with us? The reason why I'm asking is because I guess that the rowtime field is not inserted into the `StreamRecord` of DataStream API. The ro

Re: Flink SQL - can I force the planner to reuse a temporary view to be reused across multiple queries that access different fields?

2020-10-05 Thread Timo Walther
28, 2020 at 6:41 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Dan, unfortunetely, it is very difficult to read you plan? Maybe you can share a higher resolution and highlight which part of the pipeline is A, B etc. In general, the planner should be smart

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-09 Thread Timo Walther
ub.com/austince/flink-1.10-sql-windowing-error On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <mailto:twal...@apache.org>> wrote: Btw which planner are you using? Regards, Timo On 05.10.20 10:23, Timo Walther wrote: > Hi Austin, > > could you share

Re: sql/table configuration naming guide/style/spec

2020-10-09 Thread Timo Walther
Hi Luan, we haven't updated all config parameters to string-based options. This is still on going. The idle state retention will be configurable in 1.12: https://issues.apache.org/jira/browse/FLINK-18555 I hope this helps. Regards, Timo On 09.10.20 15:33, Luan Cooper wrote: Hi I've read

<    1   2   3   4   5   6   7   >