how to add a new runtime operator

2020-08-11 Thread Vincent Cai
Hi All, Flink is an amazing streaming platfrom??meanwhile it is also complicated. Do we have some "step by step" guideline for adding new runtime operators for Flink? I got one page via google but it should be out of date. Thanks.  https://ci.apache.org/projects/flink/flink-docs-release-1.1/int

Is there a way to start a timer without ever receiving an event?

2020-08-11 Thread Marco Villalobos
In the Stream API KeyedProcessFunction,is there a way to start a timer without ever receiving a stream event?

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Jark Wu
You can also watch this issue: https://issues.apache.org/jira/browse/FLINK-15869 On Tue, 11 Aug 2020 at 16:08, Dongwon Kim wrote: > Hi Timo, > > Thanks for your input. > We've been considering that as well, but this time I just wanted to solely > use TableEnvironment without DataStream APIs. > >

JM & TM readiness probe

2020-08-11 Thread Alexey Trenikhun
Hello, How can I define rediness fork8s job cluster deployments? I think for job manager, I can use REST API and check job status, but what about task manager? Is anyway to ask task manager Pod is it ready or not? Thanks, Alexey

Flink Parquet Streaming FileSink with scala case class with optional fields error

2020-08-11 Thread Vikash Dat
I have defined a streaming file sink for parquet to store my scala case class. StreamingFileSink .*forBulkFormat(* new Path*(*appArgs.datalakeBucket*)*, ParquetAvroWriters .*forReflectRecord(classOf[*Log*])* * )* .withBucketAssigner*(*new TransactionLogHiveBucketAssigner*(

Re: Question about ParameterTool

2020-08-11 Thread Chesnay Schepler
The benefit of the ParameterTool is that you do not increase your dependency footprint by using it. When using another CLI library you will generally package it within your user-jar, which may or may not increase the risk of dependency conflicts. Whether, and how large this risk is, depends na

Re: Question about ParameterTool

2020-08-11 Thread Marco Villalobos
Thank you for the clarification. But does it offer any additional benefits that are not clearly documented? On Tue, Aug 11, 2020 at 12:22 PM Robert Metzger wrote: > Hi, > there are absolutely no dangers not using ParameterTool. > It is used by the Flink examples, and as a showcase for global

Using Event Timestamp sink get's back with machine timezone

2020-08-11 Thread Faye Pressly
Hello, I am having an issue with Event time stamp and timezone with Flink 1.8 (1.8 because I need it to work on AWS Kinesis) I have a very simple pipeline that read events from a stream, transform to a Table does a small window (Tumblin 1 min) aggregation and groupby, transforms back to a stre

Re: Question about ParameterTool

2020-08-11 Thread Robert Metzger
Hi, there are absolutely no dangers not using ParameterTool. It is used by the Flink examples, and as a showcase for global job parameters: https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/best_practices.html#register-the-parameters-globally On Tue, Aug 11, 2020 at 7:13 PM Ma

Re: [Flink-KAFKA-KEYTAB] Kafkaconsumer error Kerberos

2020-08-11 Thread Vijayendra Yadav
Dawid, I was able to resolve the keytab issue by passing the service name, but now I am facing the KRB5 issue. Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism GSSAPI Caused by: javax.security.sasl.SaslException: Failure to initializ

Re: Flink CPU load metrics in K8s

2020-08-11 Thread Bajaj, Abhinav
Hi, Reaching out to folks running Flink on K8s. ~ Abhinav Bajaj From: "Bajaj, Abhinav" Date: Wednesday, August 5, 2020 at 1:46 PM To: Roman Grebennikov , "user@flink.apache.org" Subject: Re: Flink CPU load metrics in K8s Thanks Roman for providing the details. I also made more observations

Question about ParameterTool

2020-08-11 Thread Marco Villalobos
What are the dangers of not using the ParameterTool for parsing command line parameters? I have been using Picocli (https://picocli.info/). Will this be a mistake? Are there any side-effects that I should be aware of?

Re: Missing metrics when using metric reporter on high parallelism

2020-08-11 Thread Chesnay Schepler
IIRC this can be caused by the Carbon MAX_CREATES_PER_MINUTE setting. I would deem it unlikely that the reporter thread is busy for 30 seconds. On 11/08/2020 16:57, Nikola Hrusov wrote: Hello, I am doing some tests with flink 1.11.1 and I have noticed something strange/wrong going on with the

Missing metrics when using metric reporter on high parallelism

2020-08-11 Thread Nikola Hrusov
Hello, I am doing some tests with flink 1.11.1 and I have noticed something strange/wrong going on with the exported metrics. I have a configuration like such: *metrics.reporter.graphite.class: org.apache.flink.metrics.graphite.GraphiteReporterFactorymetrics.reporter.graphite.host: graphitem

Re: Flink cluster deployment strategy

2020-08-11 Thread Till Rohrmann
Hi Sidhant, see the inline comments for answers On Tue, Aug 11, 2020 at 3:10 PM sidhant gupta wrote: > Hi Till, > > Thanks for your response. > I have few queries though as mentioned below: > (1) Can flink be used in map-reduce fashion with data streaming api ? > What do you understand as map-

Re: Flink cluster deployment strategy

2020-08-11 Thread sidhant gupta
Hi Till, Thanks for your response. I have few queries though as mentioned below: (1) Can flink be used in map-reduce fashion with data streaming api ? (2) Does it make sense to use aws EMR if we are not using flink in map-reduce fashion with streaming api ? (3) Can flink cluster be auto scaled usi

Re: GroupBy with count on a joint table only let met write using toRetractStream

2020-08-11 Thread godfrey he
Hi Faye, 1) In your sql, different events are for different groups, it seems hard to extract a global Filter into DataStream. 2) AFAK, you can just drop the retract message (the flag is false), and then convert the retract stream to append stream. The downstream job needs to duplicate the records

RE: BucketingSink & StreamingFileSink

2020-08-11 Thread Mariano González Núñez
Hi Robert, Thanks for the answer... De: Robert Metzger Enviado: martes, 11 de agosto de 2020 3:46 Para: Mariano González Núñez Cc: user@flink.apache.org Asunto: Re: BucketingSink & StreamingFileSink Hi Mariano, thanks a lot for your question. The resolution

Re: Flink cluster deployment strategy

2020-08-11 Thread Till Rohrmann
Hi Sidhant, I am not an expert on AWS services but I believe that EMR might be a bit easier to start with since AWS EMR comes with Flink support out of the box [1]. On ECS I believe that you would have to set up the containers yourself. Another interesting deployment option could be to use Flink's

Re: Kafka source, committing and retries

2020-08-11 Thread Till Rohrmann
Hi Jack, if your records are already partitioned wrt the individual topics and you don't need to compute some global values, then you could create for every topic a separate Flink pipeline (separate FlinkKafkaConsumer) which runs independently. That way if one of the APIs degrades it will automati

Re: Event time based disconnection detection logic

2020-08-11 Thread Timo Walther
Sometimes it's not easy to spot the obvious ;-) Great that it works now. Let us know if you have further questions. Regards, Timo On 11.08.20 10:51, Manas Kale wrote: Hi Timo, I got it, the issue was a (silly) mistake on my part. I unnecessarily put all the processElement() logic inside the i

Re: Two Queries and a Kafka Topic

2020-08-11 Thread Danny Chan
Hi, Marco ~ It seems what you need is a temporal join from the SQL side, you can define 2 Flink tables for your PostgreSQL ones and join your Kafka stream with them [1][3]. Flink 1.10 also supports this. There is some difference with the DDL compared to 1.11 [2] [1]  https://ci.apache.org/pro

Re: Event time based disconnection detection logic

2020-08-11 Thread Manas Kale
Hi Timo, I got it, the issue was a (silly) mistake on my part. I unnecessarily put all the processElement() logic inside the if condition. The if() condition is there because I want to emit a disconnected STOPPED message only once. So the correct code is : @Override public void processElemen

Re: JobManager refusing connections when running many jobs in parallel?

2020-08-11 Thread Robert Metzger
Thanks for checking. Your analysis sounds correct. The JM is busy processing job submissions, resulting in other submissions not being accepted. Increasing rest.connection-timeout should resolve your problem. On Fri, Aug 7, 2020 at 1:59 AM Hailu, Andreas wrote: > Thanks for pointing this out.

Re: Proper way to do Integration Testing ?

2020-08-11 Thread Timo Walther
Hi Faye, Flink does not officially provide testing tools at the moment. However, you can use internal Flink tools if they solve your problem. The `flink-end-to-end-tests` module [1] shows some examples how we test Flink together with other systems. Many tests are still using plain bash scrip

Re: Batch version of StreamingFileSink.forRowFormat(...)

2020-08-11 Thread Timo Walther
Hi Dan, InputFormats are the connectors of the DataSet API. Yes, you can use either readFile, readCsvFile, readFileOfPrimitives etc. However, I would recommend to also give Table API a try. The unified TableEnvironment is able to perform batch processing and is integrated with a bunch of conn

Re: [Flink-KAFKA-KEYTAB] Kafkaconsumer error Kerberos

2020-08-11 Thread Dawid Wysakowicz
Hi, As far as I know the approach 2) is the supported way of setting up Kerberos authentication in Flink. In the second approach have you tried setting the `sasl.kerberos.service.name` in the configuration of your KafkaConsumer/Producer[1]? I think this might be the issue. Best, Dawid [1] https

Re: Flink job percentage

2020-08-11 Thread Robert Metzger
Hi Flavio, I'm not aware of such a heuristic being implemented anywhere. You need to come up with something yourself. On Fri, Aug 7, 2020 at 12:55 PM Flavio Pompermaier wrote: > Hi to all, > one of our customers asked us to see a percentage of completion of a Flink > Batch job. Is there any alr

Re: Flink maxrecordcount increase causing a few task manager throughput drops

2020-08-11 Thread Robert Metzger
Hi Terry, Why did you change the configuration? It is indeed not intuitive for the throughput to drop. Maybe some internal throttling or rate limits on AWS side are causing this problem. Best, Robert On Fri, Aug 7, 2020 at 10:48 AM Terry Chia-Wei Wu wrote: > hi, > > I change the following co

Re: S3 file source parallelism reverting to 1

2020-08-11 Thread Dmytro Dragan
Hi Steve, When you call env.readFile(…), internally env creates: ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval); ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(

Re: Event time based disconnection detection logic

2020-08-11 Thread Timo Walther
Hi Manas, at the first glance your code looks correct to me. I would investigate if your keys and watermarks are correct. Esp. the watermark frequency could be an issue. If watermarks are generated at the same time as the heartbeats itself, it might be the case that the timers fire first befo

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Dongwon Kim
Hi Timo, Thanks for your input. We've been considering that as well, but this time I just wanted to solely use TableEnvironment without DataStream APIs. but that would be the most straightforward solution this time around. Thanks and regards, Dongwon On Tue, Aug 11, 2020 at 4:50 PM Timo Walth

Re: Matching largest event pattern without duplicates

2020-08-11 Thread Dawid Wysakowicz
Hi James, I think it is not easy to achieve with the CEP library. Adding the consecutive quantifier to the oneOrMore strategy should eliminate a few of the unwanted cases from your example (`b:c`, `b`, `a`, `c`), but it would not eliminate the `c:a`. The problem is you need to skip to the first du

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Timo Walther
Hi Dongwon, another possibility is to use DataStream API before. There you can extract the metadata and use DataStream.assignTimestampsAndWatermarks before converting the stream to a table. Regards, Timo On 11.08.20 09:41, Dongwon Kim wrote: Hi Dawid, I'll try your suggestion [2] and wait

Re: Does Flink automatically apply the rebalance operator ?

2020-08-11 Thread Robert Metzger
Hi Suraj, yes, this is the expected and desired behavior: a rebalance is introduced so that all 6 process functions are performing work. On Thu, Jul 30, 2020 at 8:53 PM Suraj Puvvada wrote: > Hello > > We are testing a simple use case where we read from kafka -> process and > write to kafka. >

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Fabian Hueske
Hi Dongwon, Maybe you can add your use case to the FLIP-107 discussion thread [1] and thereby support the proposal (after checking that it would solve your problem). It's always helpful to learn about the requirements of users when designing new features. It also helps to prioritize which feature

Re: BucketingSink & StreamingFileSink

2020-08-11 Thread Robert Metzger
Hi Mariano, thanks a lot for your question. The resolution on StackOverflow seems to be that Azure Datalake is not yet ( https://issues.apache.org/jira/browse/FLINK-18568) supported by the StreamingFileSink. On Thu, Jul 30, 2020 at 5:34 PM Mariano González Núñez < mariano@hotmail.com> wrote:

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Dongwon Kim
Hi Dawid, I'll try your suggestion [2] and wait for [1] to be supported in next versions. Thanks, p.s. It's not easy to insert the timestamp into the body because it will affect other applications. In this regard, I hope [1] is going to be available soon. Dongwon On Tue, Aug 11, 2020 at 4:31

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Dawid Wysakowicz
I'm afraid it is not supported yet. The discussion[1] to support it started in the past, but unfortunately it has not concluded yet. One approach I can think of, how you can work this limitation around is to provide your own Format[2]. Unfortunately it is not the most straightforward solution. Be

[SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Dongwon Kim
Hi, I'm working on a Kafka topic where timestamps are not shown in the message body, instead in the message's metadata. I want to declare a table from the topic with DDL but "rowtime_column_name" in the below definition seems to accept only existing columns. > WATERMARK FOR rowtime_column_name A

Flink cluster deployment strategy

2020-08-11 Thread sidhant gupta
Hi all, I'm kind of new to flink cluster deployment. I wanted to know which flink cluster deployment and which job mode in aws is better in terms of ease of deployment, maintenance, HA, cost, etc. As of now I am considering aws EMR vs ECS (docker containers). We have a usecase of setting up a data