Running flink in a Local Execution Environment for Production Workloads

2020-10-23 Thread Joseph Lorenzini




Hi all,
 
I plan to run flink jobs as docker containers in a AWS Elastic Container Service. I will have checkpointing enabled where state is stored in a s3 bucket. Each deployment will run in a per-job mode.  Are there
 any non-obvious downsides to running these jobs with a local execution environment so that the deployment turns into deploying a single java application?
 
The obvious downside is that you don’t get any horizontal scalability. That’s a given and I’d have to scale up not out in this mode. I’d like to discover if there are any other negatives with this approach.
 
Thanks,
Joe  

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




FlinkKafkaProducer Fails with "Topic not present in metadata"

2020-12-03 Thread Joseph Lorenzini
Hi all,

I have a flink job that uses FlinkKafkaConsumer to consume messages off a kafka 
topic and a FlinkKafkaProducer to produce records on a Kafka topic. The 
consumer works fine. However, the flink job eventually fails with the following 
exception.

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic XXX not 
present in metadata after 6 ms.

I did find this issue but didn't have any details so I am not sure if its 
related or not.

https://issues.apache.org/jira/browse/FLINK-18757

Some details that might be important:

- yes I verified the topic exists__
- The kafka cluster that the flink job is integrating with is the Confluent 
cloud platform at version 5.5.0. This means it should be compatible with apache 
kafka 2.5.X.  See here for details 
https://docs.confluent.io/platform/current/installation/versions-interoperability.html
- ACLs and SASL SSL are turned on
- a springboot app that I wrote (which uses spring kafka) is able to write to 
this same topic using the same credentials as what the flink job is using
- I am on flink 1.11.2 and using the flink-connector-kafka_2.11 at version 
1.11.2.
- I turned on trace logs and verified that metadata requests from the flink job 
occur and metadata responses from the kafka broker are returned.
- I've set producer semantics to none and disabled checkpointing



Privileged/Confidential Information may be contained in this message. If you 
are not the addressee indicated in this message (or responsible for delivery of 
the message to such person), you may not copy or deliver this message to 
anyone. In such case, you should destroy this message and kindly notify the 
sender by reply email. Please advise immediately if you or your employer does 
not consent to Internet email for messages of this kind. Opinions, conclusions 
and other information in this message that do not relate to the official 
business of my firm shall be understood as neither given nor endorsed by it.


Re: FlinkKafkaProducer Fails with "Topic not present in metadata"

2020-12-10 Thread Joseph Lorenzini




So I actually discovered what the problem is after some extensive debugging with confluent.

 
My Kafka Producer was attempting to send a record to specific topic partition. The topic partition did not exist. So what the error message should have said is something like “Record sent to partition  in topic topic  but this partition does not exist”
 
Confluent says they’ll be sending an upstream patch to apache kafka to improve the error message.
 
Thanks,
Joe 
 

 
From: Becket Qin 
Date: Thursday, December 10, 2020 at 9:27 AM
To: Joseph Lorenzini 
Cc: "user@flink.apache.org" 
Subject: Re: FlinkKafkaProducer Fails with "Topic not present in metadata"


 


Hi Joseph,  

 


Thanks for the thorough information. Do you happen to have the trace level logging available? If so, do you mind puttng it somewhere so we can take a look?


 


Thanks,


 


Jiangjie (Becket) Qin


 


On Thu, Dec 3, 2020 at 8:55 PM Joseph Lorenzini <jlorenz...@gohealth.com> wrote:


Hi all,

I have a flink job that uses FlinkKafkaConsumer to consume messages off a kafka topic and a FlinkKafkaProducer to produce records on a Kafka topic. The consumer works fine. However, the flink job eventually fails with the following exception.

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic XXX not present in metadata after 6 ms.

I did find this issue but didn't have any details so I am not sure if its related or not.

https://issues.apache.org/jira/browse/FLINK-18757

Some details that might be important:

- yes I verified the topic exists__
- The kafka cluster that the flink job is integrating with is the Confluent cloud platform at version 5.5.0. This means it should be compatible with apache kafka 2.5.X.  See here for details

https://docs.confluent.io/platform/current/installation/versions-interoperability.html
- ACLs and SASL SSL are turned on
- a springboot app that I wrote (which uses spring kafka) is able to write to this same topic using the same credentials as what the flink job is using
- I am on flink 1.11.2 and using the flink-connector-kafka_2.11 at version 1.11.2.
- I turned on trace logs and verified that metadata requests from the flink job occur and metadata responses from the kafka broker are returned.
- I've set producer semantics to none and disabled checkpointing



Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.



Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Normalizing Metric Identifies for Reporting Systems

2021-01-08 Thread Joseph Lorenzini




Hi all,
 
I am implementing a metric reporter for newrelic. I’d like it to support a job’s operator metrics that come with the flink framework out of the box. In order to ensure each metric is unique you can’t use the
 metric name, you need to use the metric identifier. However, I am not sure the best way to present metric identifiers to newrelic. Here’s the format for an operator.
 
.taskmanager
 
Here’s an example:
 
ip.taskmanager.19ebf992ecc26eed8269da120a17c20a.NorvaxWordCount.CHAIN DataSource (at get(TextInput.java:63) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (TokenizerFlatMap)
 -> Combine (SumTheFields).0.isBackPressured
 
What’s currently tripping me up is the value of the operator:

 
CHAIN DataSource (at get(TextInput.java:63) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (TokenizerFlatMap) -> Combine (SumTheFields).
 
I can’t realistically make that part of the newrelic metric name. That wouldn’t be usable in the UI or for querying and NR has length restrictions on metric names. What I am looking for is a way to normalize
 this as part of the metric name such that it’s both useful for a human to consume and something that a system like newrelic can consume.
 
Thanks,
Joe 
 
 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-08 Thread Joseph Lorenzini




Hi all,
 
I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this expected and if there’s a way
 to work around it. 
 
I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each kafka topic, I configured the
 flink kafka consumer like so:
 
   consumer.assignTimestampsAndWatermarks(
    WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withIdleness(Duration.ofSeconds(10))
    );
 
The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events are emitted as expected.
 If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.
 
Is this expected and if it how do you handle this use case?
 
Thanks,
Joe 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-09 Thread Joseph Lorenzini




Hi Arvid,
 
I am on 1.11.2. 
 
The flink job has four operators:
 

Source from kakfa topic one: sent 14 recordsSource from kafka topic two: sent 6 recordsMap: received 15 records/sent 14 recordsMap: received 6 records/sent 6 recordsTumbling Window to Filesink: received 20 records/sent 0 records
 
The watermark is the same for the map operators and the tumbling window, which is to say that between the map and tumbling window the watermark did not advance.

 
Any idea why that might be happening? I did notice that the timestamps for all kafka records are within a fraction of a second of one another. For example:

 

2021-06-09T08:57:00.993-05:002021-06-09T08:57:00.997-05:00
 
I also noted that some kafka records in topic A have the exact same timestamp as records in topic B.

 
Could timestamps not being far enough part (e.g millisecond or more) or two records between two soruces have the exact same time,  cause the watermarks to not advance?
 
 
Joe 
 

From: Arvid Heise 
Date: Wednesday, June 9, 2021 at 8:34 AM
To: Joseph Lorenzini 
Cc: "user@flink.apache.org" 
Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record


 



Hi Joe,


 


could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance.


On which Flink version are you running?


 


On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini <jlorenz...@gohealth.com> wrote:




Hi all,
 
I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this
 expected and if there’s a way to work around it. 
 
I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each
 kafka topic, I configured the flink kafka consumer like so:
 
   consumer.assignTimestampsAndWatermarks(
    WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withIdleness(Duration.ofSeconds(10))
    );
 
The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events
 are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.
 
Is this expected and if it how do you handle this use case?
 
Thanks,
Joe


Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone.
 In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message
 that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.





Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

2021-06-09 Thread Joseph Lorenzini




Hi Arvid,
 
I may have figured out the problem. 
 
When using a tumbling window on a keyed stream and event time is being used, does time only move forward when there’s an event with a newer timestamp? Said another way: if I have a 5 second tumbling window, the job would need to consume
 at least two events before a computation would occur: the first event has a timestamp that fits within the 5 second window, the second event has timestamp that exceeds the max timestamp of the previous window.

 
Does that sound right?
 
Thanks,
Joe 
 

From: Arvid Heise 
Date: Wednesday, June 9, 2021 at 8:34 AM
To: Joseph Lorenzini 
Cc: "user@flink.apache.org" 
Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record


 



Hi Joe,


 


could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance.


On which Flink version are you running?


 


On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini <jlorenz...@gohealth.com> wrote:




Hi all,
 
I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this
 expected and if there’s a way to work around it. 
 
I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each
 kafka topic, I configured the flink kafka consumer like so:
 
   consumer.assignTimestampsAndWatermarks(
    WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withIdleness(Duration.ofSeconds(10))
    );
 
The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events
 are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.
 
Is this expected and if it how do you handle this use case?
 
Thanks,
Joe


Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone.
 In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message
 that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.





Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Support for Microseconds in Avro Deserialization

2021-08-02 Thread Joseph Lorenzini




Hi all,
 
The avro specification supports microseconds and reviewing the source code in org.apache.avro.LogicalTypes seems to indicate microsecond support. However, the conversion code in flink (see org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema)
 has this checked:
 
    if (precision <= 3) {
    avroLogicalType = LogicalTypes.timestampMillis();
    } else {
    throw new IllegalArgumentException(
    "Avro does not support TIMESTAMP type "
    + "with precision: "
    + precision
    + ", it only supports precision less than 3.");
    }
 
So it seems that flink only supports managing avro timestamps with at most millisecond precision. Does someone have a brief explanation about why this limitation exists? Depending on how complicated it is,
 I’d be willing to submit a PR to add that support in. 
 
Thanks,
Joe 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Table API Throws Calcite Exception CannotPlanException When Tumbling Window is Used

2021-08-05 Thread Joseph Lorenzini




Hi all,
 
I am on flink 1.12.3. I am trying to get a tumbling window work with the table API as documented here:

 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#tumble
 
I have a kafka topic as a flink source. I convert the stream into a table using the StreamTableEnvironment#fromDataStream method. Then, once the table is registered in. I attempt to execute this table api
 SQL:
 
SELECT window_start, window_end, avg(state_ts)  
FROM TABLE(TUMBLE(TABLE lead_buffer, DESCRIPTOR(proctime), INTERVAL '1' MINUTES))

   GROUP BY window_start, window_end
 
However, this exception is thrown:
 
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any,
 MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE]
 
 
Does anyone have any idea about what I might be doing wrong here?

 
Thanks,
Joe 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Support for authenticated schema registry in debezium registry

2021-08-06 Thread Joseph Lorenzini




Hi all,
 
I am on flink 1.13.2. I set up create table like so:
 
CREATE TABLE lead_buffer (
  `id` INT NOT NULL,
  `state` STRING NOT NULL,
   PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector'= 'kafka',
  'topic' = 'buffer',
  'format'= 'debezium-avro-confluent',
  'debezium-avro-confluent.schema-registry.url'= 'http://localhost:8081',
  'scan.startup.mode'= 'earliest-offset',
  'properties.group.id' = 'customers',
  'debezium-avro-confluent.basic-auth.user-info' = 'sr-user:sr-user-password',
  'properties.bootstrap.servers'= 'localhost:9092',
  'value.fields-include'= 'EXCEPT_KEY',
  'properties.ssl.endpoint.identification.algorithm'= 'http',
  'properties.security.protocol'= 'SASL_PLAINTEXT',
  'properties.sasl.mechanism'= 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="password";')
 
 
I am looking at the docs here:
 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/#debezium-avro-confluent-basic-auth-user-info
 
According the properties table, there is a property for setting auth to a schema registry: debezium-avro-confluent.basic-auth.user-info. However, when I set this in the DDL for creating the table I get this
 error:
 
 
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kafka'.
 
Unsupported options:
 
debezium-avro-confluent.basic-auth.user-info
 
 
I found “FLINK-22858: avro-confluent doesn't support confluent schema registry that has security enabled”. However that ticket was closed as a duplicate of FLINK-21229, which been resolved and fixed in 1.13.2.
 
Does anyone know if this has been in fact fixed or whether this is user error on my part?
 
Thanks,
Joe 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Windowed Aggregation With Event Time over a Temporary View

2021-08-16 Thread Joseph Lorenzini




Hi all,
 
I am on Flink 1.12.3. 
 
So here’s the scenario: I have a Kafka topic as a source, where each record repsents a change to an append only audit log. The kafka record has the following fields:
 

id (unique identifier for that audit log entry)
operation id (is shared across multiple records)operation (string)start_ts (TIMESTAMP(3))end_ts (TIMESTAMP(3))
 
I am trying to calculate the average item count and duration per operation. I first converted the kafka source to an append only data stream and then I attempted to run the following SQL:
 
  Table workLogTable = tableEnv.fromDataStream(workLogStream)  

  tableEnv.createTemporaryView("work_log", workLogTable);
 Table workLogCntTable = tableEnv.sqlQuery("select operation_id, operation, max(start_ts) as start_ts, max(end_ts) as end_ts, count(*) as item_count, max(audit_ts) as audit_ts, max(event_time) as max_event_time" +
    " FROM work_log GROUP BY operation_id, operation");
    tableEnv.createTemporaryView("work_log_cnt", workLogCntTable);
    tableEnv.executeSql("select max(audit_ts), operation, avg(item_count) as average_item_count, AVG(end_ts - start_ts) as avg_duration from" +
    " work_log_cnt" +
    " GROUP BY TUMBLE(max_event_time, INTERVAL '1' SECOND), operation").print();
 
The problem I am having is that I am unable to preserve the event time between the first view and the second. I am getting this error:
 
caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 139 to line 1, column 181: Cannot apply '$TUMBLE' to arguments of type '$TUMBLE(, )'. Supported form(s):
 '$TUMBLE(, )'
 
My guess is that the max function in the first query is converting the event time from DATETIME type to a BigInt. I am not sure how to apply an aggregate to the event time in the first query such that the event time from the original
 kafka stream can be used in the second view. Is there a way to make this work? 

 
Thanks,
Joe 
 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.