Running flink in a Local Execution Environment for Production Workloads
Hi all, I plan to run flink jobs as docker containers in a AWS Elastic Container Service. I will have checkpointing enabled where state is stored in a s3 bucket. Each deployment will run in a per-job mode. Are there any non-obvious downsides to running these jobs with a local execution environment so that the deployment turns into deploying a single java application? The obvious downside is that you don’t get any horizontal scalability. That’s a given and I’d have to scale up not out in this mode. I’d like to discover if there are any other negatives with this approach. Thanks, Joe Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
FlinkKafkaProducer Fails with "Topic not present in metadata"
Hi all, I have a flink job that uses FlinkKafkaConsumer to consume messages off a kafka topic and a FlinkKafkaProducer to produce records on a Kafka topic. The consumer works fine. However, the flink job eventually fails with the following exception. Caused by: org.apache.kafka.common.errors.TimeoutException: Topic XXX not present in metadata after 6 ms. I did find this issue but didn't have any details so I am not sure if its related or not. https://issues.apache.org/jira/browse/FLINK-18757 Some details that might be important: - yes I verified the topic exists__ - The kafka cluster that the flink job is integrating with is the Confluent cloud platform at version 5.5.0. This means it should be compatible with apache kafka 2.5.X. See here for details https://docs.confluent.io/platform/current/installation/versions-interoperability.html - ACLs and SASL SSL are turned on - a springboot app that I wrote (which uses spring kafka) is able to write to this same topic using the same credentials as what the flink job is using - I am on flink 1.11.2 and using the flink-connector-kafka_2.11 at version 1.11.2. - I turned on trace logs and verified that metadata requests from the flink job occur and metadata responses from the kafka broker are returned. - I've set producer semantics to none and disabled checkpointing Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Re: FlinkKafkaProducer Fails with "Topic not present in metadata"
So I actually discovered what the problem is after some extensive debugging with confluent. My Kafka Producer was attempting to send a record to specific topic partition. The topic partition did not exist. So what the error message should have said is something like “Record sent to partition in topic topic but this partition does not exist” Confluent says they’ll be sending an upstream patch to apache kafka to improve the error message. Thanks, Joe From: Becket Qin Date: Thursday, December 10, 2020 at 9:27 AM To: Joseph Lorenzini Cc: "user@flink.apache.org" Subject: Re: FlinkKafkaProducer Fails with "Topic not present in metadata" Hi Joseph, Thanks for the thorough information. Do you happen to have the trace level logging available? If so, do you mind puttng it somewhere so we can take a look? Thanks, Jiangjie (Becket) Qin On Thu, Dec 3, 2020 at 8:55 PM Joseph Lorenzini <jlorenz...@gohealth.com> wrote: Hi all, I have a flink job that uses FlinkKafkaConsumer to consume messages off a kafka topic and a FlinkKafkaProducer to produce records on a Kafka topic. The consumer works fine. However, the flink job eventually fails with the following exception. Caused by: org.apache.kafka.common.errors.TimeoutException: Topic XXX not present in metadata after 6 ms. I did find this issue but didn't have any details so I am not sure if its related or not. https://issues.apache.org/jira/browse/FLINK-18757 Some details that might be important: - yes I verified the topic exists__ - The kafka cluster that the flink job is integrating with is the Confluent cloud platform at version 5.5.0. This means it should be compatible with apache kafka 2.5.X. See here for details https://docs.confluent.io/platform/current/installation/versions-interoperability.html - ACLs and SASL SSL are turned on - a springboot app that I wrote (which uses spring kafka) is able to write to this same topic using the same credentials as what the flink job is using - I am on flink 1.11.2 and using the flink-connector-kafka_2.11 at version 1.11.2. - I turned on trace logs and verified that metadata requests from the flink job occur and metadata responses from the kafka broker are returned. - I've set producer semantics to none and disabled checkpointing Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it. Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Normalizing Metric Identifies for Reporting Systems
Hi all, I am implementing a metric reporter for newrelic. I’d like it to support a job’s operator metrics that come with the flink framework out of the box. In order to ensure each metric is unique you can’t use the metric name, you need to use the metric identifier. However, I am not sure the best way to present metric identifiers to newrelic. Here’s the format for an operator. .taskmanager Here’s an example: ip.taskmanager.19ebf992ecc26eed8269da120a17c20a.NorvaxWordCount.CHAIN DataSource (at get(TextInput.java:63) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (TokenizerFlatMap) -> Combine (SumTheFields).0.isBackPressured What’s currently tripping me up is the value of the operator: CHAIN DataSource (at get(TextInput.java:63) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (TokenizerFlatMap) -> Combine (SumTheFields). I can’t realistically make that part of the newrelic metric name. That wouldn’t be usable in the UI or for querying and NR has length restrictions on metric names. What I am looking for is a way to normalize this as part of the metric name such that it’s both useful for a human to consume and something that a system like newrelic can consume. Thanks, Joe Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record
Hi all, I have observed behavior joining two keyed streams together, where events are never emitted. The source of each stream is a different kafka topic. I am curious to know if this expected and if there’s a way to work around it. I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition. For each kafka topic, I configured the flink kafka consumer like so: consumer.assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withIdleness(Duration.ofSeconds(10)) ); The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key. If I use Tumbling Process window then events are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted. Is this expected and if it how do you handle this use case? Thanks, Joe Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record
Hi Arvid, I am on 1.11.2. The flink job has four operators: Source from kakfa topic one: sent 14 recordsSource from kafka topic two: sent 6 recordsMap: received 15 records/sent 14 recordsMap: received 6 records/sent 6 recordsTumbling Window to Filesink: received 20 records/sent 0 records The watermark is the same for the map operators and the tumbling window, which is to say that between the map and tumbling window the watermark did not advance. Any idea why that might be happening? I did notice that the timestamps for all kafka records are within a fraction of a second of one another. For example: 2021-06-09T08:57:00.993-05:002021-06-09T08:57:00.997-05:00 I also noted that some kafka records in topic A have the exact same timestamp as records in topic B. Could timestamps not being far enough part (e.g millisecond or more) or two records between two soruces have the exact same time, cause the watermarks to not advance? Joe From: Arvid Heise Date: Wednesday, June 9, 2021 at 8:34 AM To: Joseph Lorenzini Cc: "user@flink.apache.org" Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record Hi Joe, could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance. On which Flink version are you running? On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini <jlorenz...@gohealth.com> wrote: Hi all, I have observed behavior joining two keyed streams together, where events are never emitted. The source of each stream is a different kafka topic. I am curious to know if this expected and if there’s a way to work around it. I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition. For each kafka topic, I configured the flink kafka consumer like so: consumer.assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withIdleness(Duration.ofSeconds(10)) ); The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key. If I use Tumbling Process window then events are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted. Is this expected and if it how do you handle this use case? Thanks, Joe Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it. Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record
Hi Arvid, I may have figured out the problem. When using a tumbling window on a keyed stream and event time is being used, does time only move forward when there’s an event with a newer timestamp? Said another way: if I have a 5 second tumbling window, the job would need to consume at least two events before a computation would occur: the first event has a timestamp that fits within the 5 second window, the second event has timestamp that exceeds the max timestamp of the previous window. Does that sound right? Thanks, Joe From: Arvid Heise Date: Wednesday, June 9, 2021 at 8:34 AM To: Joseph Lorenzini Cc: "user@flink.apache.org" Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record Hi Joe, could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance. On which Flink version are you running? On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini <jlorenz...@gohealth.com> wrote: Hi all, I have observed behavior joining two keyed streams together, where events are never emitted. The source of each stream is a different kafka topic. I am curious to know if this expected and if there’s a way to work around it. I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition. For each kafka topic, I configured the flink kafka consumer like so: consumer.assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withIdleness(Duration.ofSeconds(10)) ); The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key. If I use Tumbling Process window then events are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted. Is this expected and if it how do you handle this use case? Thanks, Joe Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it. Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Support for Microseconds in Avro Deserialization
Hi all, The avro specification supports microseconds and reviewing the source code in org.apache.avro.LogicalTypes seems to indicate microsecond support. However, the conversion code in flink (see org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema) has this checked: if (precision <= 3) { avroLogicalType = LogicalTypes.timestampMillis(); } else { throw new IllegalArgumentException( "Avro does not support TIMESTAMP type " + "with precision: " + precision + ", it only supports precision less than 3."); } So it seems that flink only supports managing avro timestamps with at most millisecond precision. Does someone have a brief explanation about why this limitation exists? Depending on how complicated it is, I’d be willing to submit a PR to add that support in. Thanks, Joe Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Table API Throws Calcite Exception CannotPlanException When Tumbling Window is Used
Hi all, I am on flink 1.12.3. I am trying to get a tumbling window work with the table API as documented here: https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#tumble I have a kafka topic as a flink source. I convert the stream into a table using the StreamTableEnvironment#fromDataStream method. Then, once the table is registered in. I attempt to execute this table api SQL: SELECT window_start, window_end, avg(state_ts) FROM TABLE(TUMBLE(TABLE lead_buffer, DESCRIPTOR(proctime), INTERVAL '1' MINUTES)) GROUP BY window_start, window_end However, this exception is thrown: Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE] Does anyone have any idea about what I might be doing wrong here? Thanks, Joe Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Support for authenticated schema registry in debezium registry
Hi all, I am on flink 1.13.2. I set up create table like so: CREATE TABLE lead_buffer ( `id` INT NOT NULL, `state` STRING NOT NULL, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector'= 'kafka', 'topic' = 'buffer', 'format'= 'debezium-avro-confluent', 'debezium-avro-confluent.schema-registry.url'= 'http://localhost:8081', 'scan.startup.mode'= 'earliest-offset', 'properties.group.id' = 'customers', 'debezium-avro-confluent.basic-auth.user-info' = 'sr-user:sr-user-password', 'properties.bootstrap.servers'= 'localhost:9092', 'value.fields-include'= 'EXCEPT_KEY', 'properties.ssl.endpoint.identification.algorithm'= 'http', 'properties.security.protocol'= 'SASL_PLAINTEXT', 'properties.sasl.mechanism'= 'PLAIN', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="password";') I am looking at the docs here: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/#debezium-avro-confluent-basic-auth-user-info According the properties table, there is a property for setting auth to a schema registry: debezium-avro-confluent.basic-auth.user-info. However, when I set this in the DDL for creating the table I get this error: Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kafka'. Unsupported options: debezium-avro-confluent.basic-auth.user-info I found “FLINK-22858: avro-confluent doesn't support confluent schema registry that has security enabled”. However that ticket was closed as a duplicate of FLINK-21229, which been resolved and fixed in 1.13.2. Does anyone know if this has been in fact fixed or whether this is user error on my part? Thanks, Joe Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Windowed Aggregation With Event Time over a Temporary View
Hi all, I am on Flink 1.12.3. So here’s the scenario: I have a Kafka topic as a source, where each record repsents a change to an append only audit log. The kafka record has the following fields: id (unique identifier for that audit log entry) operation id (is shared across multiple records)operation (string)start_ts (TIMESTAMP(3))end_ts (TIMESTAMP(3)) I am trying to calculate the average item count and duration per operation. I first converted the kafka source to an append only data stream and then I attempted to run the following SQL: Table workLogTable = tableEnv.fromDataStream(workLogStream) tableEnv.createTemporaryView("work_log", workLogTable); Table workLogCntTable = tableEnv.sqlQuery("select operation_id, operation, max(start_ts) as start_ts, max(end_ts) as end_ts, count(*) as item_count, max(audit_ts) as audit_ts, max(event_time) as max_event_time" + " FROM work_log GROUP BY operation_id, operation"); tableEnv.createTemporaryView("work_log_cnt", workLogCntTable); tableEnv.executeSql("select max(audit_ts), operation, avg(item_count) as average_item_count, AVG(end_ts - start_ts) as avg_duration from" + " work_log_cnt" + " GROUP BY TUMBLE(max_event_time, INTERVAL '1' SECOND), operation").print(); The problem I am having is that I am unable to preserve the event time between the first view and the second. I am getting this error: caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 139 to line 1, column 181: Cannot apply '$TUMBLE' to arguments of type '$TUMBLE(, )'. Supported form(s): '$TUMBLE(, )' My guess is that the max function in the first query is converting the event time from DATETIME type to a BigInt. I am not sure how to apply an aggregate to the event time in the first query such that the event time from the original kafka stream can be used in the second view. Is there a way to make this work? Thanks, Joe Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.