Re: Is State TTL possible with event-time characteristics ?

2022-01-10 Thread jinzhong li
Hi Dan, Currently, event-time TTL hasn't been supported due to implementation complexity and ambiguous semantics which need more discussion[1][2]. So people usually use state TTL with processing-time characteristics[3]. Could the processing-time ttl state meet your requirements in your case? I

Is there a way to know how long a Flink app takes to finish resuming from Savepoint?

2022-01-10 Thread Chen-Che Huang
Hi all, I'm trying to speed up the process of resuming from a savepoint by adjusting some configuration. I wonder whether there exists a way to know how much time our Flink app spends resuming from a savepoint? >From the logs, I can see only the starting time of the resuming (as shown below) but c

Re: unaligned checkpoint for job with large start delay

2022-01-10 Thread Piotr Nowojski
Hi Mason, Sorry for a late reply, but I was OoO. I think you could confirm it with more custom metrics. Counting how many windows have been registered/fired and plotting that over time. I think it would be more helpful in this case to check how long a task has been blocked being "busy" processin

Re: [ANNOUNCE] Apache Flink ML 2.0.0 released

2022-01-10 Thread Till Rohrmann
This is really great news. Thanks a lot for all the work Dong, Yun, Zhipeng and others! Cheers, Till On Fri, Jan 7, 2022 at 2:36 PM David Morávek wrote: > Great job! <3 Thanks Dong and Yun for managing the release and big thanks > to everyone who has contributed! > > Best, > D. > > On Fri, Jan

Re: Regarding Connector Options - value.deserializer

2022-01-10 Thread Hang Ruan
Hi, Ronak, I think you should implement a custom format by yourself instead of overriding. The 'value.format' is a required table option. Best, Hang Ronak Beejawat (rbeejawa) 于2022年1月10日周一 17:09写道: > Hi Team, > > Is there any way we use value.deserializer in Connector Options from kafka > via

Compatible alternative for ParquetInputFormat in Flink > 1.14.0

2022-01-10 Thread Meghajit Mazumdar
In flink-parquet_2.12 version 1.13.0, there used to be a class called as *org.apache.flink.formats.parquet.ParquetInputFormat . *This class's constructor used to accept

Re: Request: Java 17 Support?

2022-01-10 Thread Chesnay Schepler
See https://issues.apache.org/jira/browse/FLINK-15736 Java 17 support is currently *not* expected for 1.15. On 09/01/2022 23:21, Clayton Wohl wrote: Are there any plans for Flink to support Java 17 and provide Java 17-based Docker images? There are a variety of new language/VM features we'd l

Re: Flink rest api to start a job

2022-01-10 Thread Chesnay Schepler
This is expected behavior. Since jar A is on the classpath you are able to access the entry-class of said jar. When you specify the jar id all that does is put another jar on the classpath; it is not enforce that the entry-class is loaded from said jar. On 08/01/2022 16:45, Qihua Yang wrote:

Re: How to reduce interval between Uptime Metric meaasurements?

2022-01-10 Thread Chesnay Schepler
I'd suggest to double check whether you're actually scraping every second, or every minute. To my knowledge the uptime metric is not periodically updated but always reflects the latest state when polled. On 31/12/2021 16:37, Geldenhuys, Morgan Karl wrote: Thanks for the hint, however i am n

Re: Regarding Connector Options - value.deserializer

2022-01-10 Thread Hang Ruan
Hi, Ronak, We can not set specific 'value.deserializer' in table option. 'key.deserializer' and 'value.deserializer' is always set to 'org.apache.kafka.common.serialization.ByteArrayDeserializer'. If you want to implement a format, you could take a look at the code JsonFormatFactory.java in flink

Re: Job stuck in savePoint - entire topic replayed on restart.

2022-01-10 Thread Piotr Nowojski
Hi Basil, 1. What do you mean by: > The only way we could stop these stuck jobs was to patch the finalizers. ? 2. Do you mean that your job is stuck when doing stop-with-savepoint? 3. What Flink version are you using? Have you tried upgrading to the most recent version, or at least the most recent

Re: pyflink mixed with Java operators

2022-01-10 Thread Dian Fu
Hi, You could try the following method: ``` from pyflink.java_gateway import get_gateway jvm = get_gateway().jvm ds = ( DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction())) ) ``` Regards, Dian On Fri, Jan 7, 2022 at 1:00 PM Francis Conroy wrote: > Hi all, > > Does anyon

Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-10 Thread Piotr Nowojski
Hi Sid, I don't see on the stackoverflow explanation of what are you trying to do here (no mentions of MapFunction or a tuple). If you want to create a `DataStream` from some a pre existing/static Tuple of Strings, the easiest thing would be to convert the tuple to a collection/iterator and use `

Re: Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-10 Thread Fabian Paul
Hi Kevin, I created a ticket to track the effort [1]. Unfortunately, we are already in the last few weeks of the release cycle for 1.15 so I cannot guarantee that someone can implement it until then. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-25591 On Fri, Jan 7, 2022 at 5:07

Re: Custom Kafka Keystore on Amazon Kinesis Analytics

2022-01-10 Thread Piotr Nowojski
Hi Clayton, I think in principle this example should be still valid, however instead of providing a `CustomFlinkKafkaConsumer` and overriding it's `open` method, you would probably need to override `org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#start`. So you would most likely n

Re: Flink native k8s integration vs. operator

2022-01-10 Thread Gyula Fóra
Hi All! This is a very interesting discussion. I think many users find it confusing what deployment mode to choose when considering a new production application on Kubernetes. With all the options of native, standalone and different operators this can get tricky :) I really like the idea that Th

Re: Uploading jar to s3 for persistence

2022-01-10 Thread Piotr Nowojski
Hi Puneet, Have you seen this thread before? [1]. It looks like the same issue and especially this part might be the key: > Be aware that the filesystem used by the FileUploadHandler > is java.nio.file.FileSystem and not > Flink's org.apache.flink.core.fs.FileSystem for which we provide different

ParquetColumnarRowInputFormat - parameter description

2022-01-10 Thread Krzysztof Chmielewski
Hi, I would like to ask for some more details regarding three ParquetColumnarRowInputFormat contruction parameters. The parameters are: batchSize, isUtcTimestamp, isCaseSensitive The parametr names gives some hint about their purpose but there is no description in docs (java, flink page). Could

Re: Is there a way to know how long a Flink app takes to finish resuming from Savepoint?

2022-01-10 Thread Piotr Nowojski
Hi, Unfortunately there is no such metric. Regarding the logs, I'm not sure what Flink version you are using, but since Flink 1.13.0 [1][2], you could relay on the tasks/subtasks switch from `INITIALIZING` to `RUNNING` to check when the task/subtask has finished recovering it's state. Best, Piotr

Re: RowType for complex types in Parquet File

2022-01-10 Thread Krzysztof Chmielewski
Hi, Isn't this actually already implemented and planed for version 1.15? https://issues.apache.org/jira/browse/FLINK-17782 Regards, Krzysztof Chmielewski pt., 7 sty 2022 o 16:20 Jing Ge napisał(a): > Hi Meghajit, > > like the exception described, parquet schema with nested columns is not > supp

'Initial segment may not be null' error

2022-01-10 Thread Egor Ryashin
Hey, I use Flink 1.14 and run this query with sql-client: SET 'sql-client.execution.result-mode' = 'tableau'; SET 'execution.runtime-mode' = 'batch’; create table data( id STRING, account_id STRING, a_id STRING, `timestamp` STRING ) with ( 'connector'='filesystem', 'path'='/Volumes/mobiled

Re: Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-10 Thread Kevin Lam
Hi Fabian, Thanks for creating and sharing that ticket. I noticed the clause "The FileSource can already read the state of the previous version", a little off-topic from the original topic of this thread but I was wondering if you could elaborate on that. Can the new FileSource interoperate with t

Re: Custom Kafka Keystore on Amazon Kinesis Analytics

2022-01-10 Thread Clayton Wohl
Custom code can create subclasses of FlinkKafkaConsumer, because the constructors are public. Custom code can't create subclasses of KafkaSource because the constructors are package private. So the same solution of creating code subclasses won't work for KafkaSource. Thank you for the response :)

Re: Uploading jar to s3 for persistence

2022-01-10 Thread Puneet Duggal
Hi Piotr, Thank you for your immediate reply. I went through this thread and it was also mentioned that flink required s3-filesystem related jars which are present in my HA flink cluster. Also as mentioned in Apache Flink Documentation for Amazon S3 integration , https://nightlies.apache.org/f

Re: Uploading jar to s3 for persistence

2022-01-10 Thread Puneet Duggal
Hi, Ignore above reply. Got your point. Just one doubt. So is using java.nio.file.FileSystem an expectation instead of Flink’s org.apache.flink.core.fs.FileSystem. I mean can we raise it as an issue to use flink filesystem instead as it allows us to use distributed filesystem as persistent sto

Re: Uploading jar to s3 for persistence

2022-01-10 Thread David Morávek
Hi Puneet, this is a known limitation and unfortunately `web.upload.dir` currently works only with the local system :( There are multiple issues covering this already, I guess FLINK-16544 [1] summarizes the current state well. This is something we want to address with the future releases. We've b

Re: Custom Kafka Keystore on Amazon Kinesis Analytics

2022-01-10 Thread Piotr Nowojski
Ah, I see. Pitty. You could always use reflection if you really had to, but that's of course not a long term solution. I will raise this issue to the KafkaSource/AWS contributors. Best, Piotr Nowojski pon., 10 sty 2022 o 16:55 Clayton Wohl napisał(a): > Custom code can create subclasses of Fli

Re: Uploading jar to s3 for persistence

2022-01-10 Thread David Morávek
I understand the issue. We currently don't have a good mechanism for this kind of external file management (we need to avoid leaking resources) :( Even right now, we kind of rely on upload directory being cleaned up by the cluster manager (yarn, k8s), because it's tied with a container lifecycle.

Re: question about Statefun/Flink version compatibility

2022-01-10 Thread Igal Shilman
Hello Galen, StateFun is using some internal APIs so they might or might not stay compatible between versions. You can try bump the version If it compiles cleanly, most likely this would work. We will be porting the main branch to Flink 1.14 this or next week. Cheers, Igal. On Mon, Jan 10, 2022 a

Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-10 Thread Siddhesh Kalgaonkar
Hi Piotr, Thanks for the reply. I was looking for how to create a DataStream under a process function since using that I had to call something else but I came across one of Fabian's posts where he mentioned that this way of creating DS is not "encouraged and tested". So, I figured out an alternate

Re: Serving Machine Learning models

2022-01-10 Thread David Anderson
Another approach that I find quite natural is to use Flink's Stateful Functions API [1] for model serving, and this has some nice advantages, such as zero-downtime deployments of new models, and the ease with which you can use Python. [2] is an example of this approach. [1] https://flink.apache.or

Re: Avro BulkFormat for the new FileSource API?

2022-01-10 Thread Kevin Lam
Hi David, Awesome, wasn't aware of FLINK-24565. That's the kind of thing we were looking for and will take a look at it. Thanks for sharing that! On Fri, Jan 7, 2022 at 2:05 PM David Morávek wrote: > Hi Kevin, > > I'm not as familiar with initiatives around the new sources, but it seems > tha

Re: pyflink mixed with Java operators

2022-01-10 Thread Francis Conroy
Thanks for this Dian. I'll give that a try. On Mon, 10 Jan 2022 at 22:51, Dian Fu wrote: > Hi, > > You could try the following method: > > ``` > from pyflink.java_gateway import get_gateway > > jvm = get_gateway().jvm > ds = ( > DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunct

Cannot load user class: avro GenericRecord

2022-01-10 Thread Jason Politis
Good evening all, I'm working on a project for a client. We are trying to execute Flink SQL using Table API in java. We are going to pull their data from oracle -> debezium -> kafka -> flink. Here is a sample of our java code: package carrera.build; import org.apache.flink.table.api.Environme

Orphaned job files in HDFS

2022-01-10 Thread David Clutter
I'm seeing files orphaned in HDFS and wondering how to clean them up when the job is completed. The directory is /user/yarn/.flink so I am assuming this is created by flink? The HDFS in my cluster eventually fills up. Here is my setup: - Flink 1.13.1 on AWS EMR - Executing flink in per-jo

Re: Cannot load user class: avro GenericRecord

2022-01-10 Thread Caizhi Weng
Hi! Could you share your pom.xml file of your user project? Did you include the flink-avro dependency? Also did you add the avro format jar to the lib directory of your Flink distribution? Jason Politis 于2022年1月11日周二 08:42写道: > Good evening all, > > I'm working on a project for a client. We ar

Re: 'Initial segment may not be null' error

2022-01-10 Thread Caizhi Weng
Hi! This seems like a bug to me. Could you share the whole exception stack (if the whole stack is not displayed in the terminal, you can search in the logs) so that others can diagnose this problem? Egor Ryashin 于2022年1月10日周一 22:54写道: > Hey, > > I use Flink 1.14 and run this query with sql-clie

Re: Orphaned job files in HDFS

2022-01-10 Thread Yang Wang
IIRC, the staging directory(/user/{name}/.flink/application_xxx) will be deleted automatically if the Flink job reaches global terminal state(e.g. FINISHED, CANCELED, FAILED). So I assume you have stopped the yarn application via "yarn application -kill", not via "bin/flink cancel". If it is the ca

Parallelism of Flink SQL LookupTableSource in 1.14 ..

2022-01-10 Thread Jonathan Weaver
I'm attempting to do a proof of concept conversion of a DataStream based Flink program over to using almost entirely Table SQL. I have a primary CDC stream (an unbounded scan table source) that does two joins to LookupTableSource tables and then on to a sink. In the datastream program the only wa

Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-10 Thread Piotr Nowojski
Glad to hear it. Best, Piotrek pon., 10 sty 2022 o 20:08 Siddhesh Kalgaonkar napisał(a): > Hi Piotr, > > Thanks for the reply. I was looking for how to create a DataStream under a > process function since using that I had to call something else but I came > across one of Fabian's posts where he