Extremely long startup time for exactly_once kafka sink

2023-01-31 Thread Bobby Richard
When enabling exactly_once on my kafka sink, I am seeing extremely long
initialization times (over 1 hour), especially after restoring from a
savepoint. In the logs I see the job constantly initializing thousands of
kafka producers like this:

2023-01-31 14:39:58,150 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-mutable-enriched-events-5-1,
transactionalId=common-event-enrichment-mutable-enriched-events-5-1]
ProducerId set to 847642 with epoch 14
2023-01-31 14:39:58,150 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-mutable-enriched-events-5-1,
transactionalId=common-event-enrichment-mutable-enriched-events-5-1]
Invoking InitProducerId for the first time in order to acquire a producer ID
2023-01-31 14:39:58,151 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-common-enriched-events-5-1,
transactionalId=common-event-enrichment-common-enriched-events-5-1]
ProducerId set to 2496758 with epoch 25
2023-01-31 14:39:58,151 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-immutable-enriched-events-5-1,
transactionalId=common-event-enrichment-immutable-enriched-events-5-1]
ProducerId set to 886210 with epoch 16
2023-01-31 14:39:58,151 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-mutable-enriched-events-5-1,
transactionalId=common-event-enrichment-mutable-enriched-events-5-1]
Discovered transaction coordinator kafka-broker-2:9092 (id: 2 rack: null)
2023-01-31 14:39:58,151 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-immutable-enriched-events-5-1,
transactionalId=common-event-enrichment-immutable-enriched-events-5-1]
Invoking InitProducerId for the first time in order to acquire a producer ID
2023-01-31 14:39:58,151 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-common-enriched-events-5-1,
transactionalId=common-event-enrichment-common-enriched-events-5-1] Invoking
InitProducerId for the first time in order to acquire a producer ID
2023-01-31 14:39:58,152 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-immutable-enriched-events-5-1,
transactionalId=common-event-enrichment-immutable-enriched-events-5-1]
Discovered transaction coordinator kafka-broker-0:9092 (id: 0 rack: null)
2023-01-31 14:39:58,152 INFO org.apache.kafka.clients.producer.internals.
TransactionManager [] - [Producer
clientId=producer-common-event-enrichment-common-enriched-events-5-1,
transactionalId=common-event-enrichment-common-enriched-events-5-1]
Discovered transaction coordinator kafka-broker-2:9092 (id: 2 rack: null)

Does transaction timeout impact the startup time? How can I optimize the
initialization time? Without exactly_once the job starts up very quickly.

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.


smime.p7s
Description: S/MIME Cryptographic Signature


Debezium format with MongoDB

2022-10-07 Thread Bobby Richard
Is it possible to use the Flink Debezium format with the changelogs
generated by the Debezium MongoDB Connector? I have tried multiple
configurations (with and without json schema included), and I always
receive a java.io.IOException: Corrupt Debezium JSON message. Could it be
related to the fact that the MongoDB connector sends the record as a JSON
string instead of regular JSON?

Example mongodb debezium changelog (notice the json string for
payload.after):
{ "schema": { ... }, "payload": { "after": "{\"_id\" : {\"$numberLong\" :
\"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\"
: \"an...@noanswer.org\"}", "patch": null, "source": { "version":
"1.9.6.Final", "connector": "mongodb", "name": "fulfillment", "ts_ms":
1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0",
"collection": "customers", "ord": 31, "h": 1546547425148721999 }, "op": "c",
"ts_ms": 1558965515240 } }

Example Postgres Debezium changelog:
{
"schema": { ... },
"payload": { "before": null, "after": { "id": 1, "first_name": "Anne",
"last_name": "Kretchmar", "email": "an...@noanswer.org" }, "source": {
"version": "1.9.6.Final", "connector": "postgresql", "name":
"PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": true, "db":
"postgres", "sequence": "[\"24023119\",\"24023128\"]" "schema": "public",
"table": "customers", "txId": 555, "lsn": 24023128, "xmin": null }, "op":
"c", "ts_ms": 1559033904863 } }

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.


smime.p7s
Description: S/MIME Cryptographic Signature


Re: FileSink to GCS

2022-03-14 Thread Bobby Richard
Just to confirm, I would need to use writeAsText instead of the the
FileSink?

On Mon, Mar 14, 2022 at 11:21 AM Martijn Visser 
wrote:

> Hi Bobby,
>
> You can already use it for writing in Flink 1.14 [1] as long as you're
> aware of the implications in case of a failure / need to recover.
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/gcs/
>
> On Mon, 14 Mar 2022 at 16:00, Bobby Richard 
> wrote:
>
>> Thanks Martijn, are there any alternatives to write to GCS using Flink
>> 1.14? Recoverability isn't important for my current use case.
>>
>> On Mon, Mar 14, 2022 at 10:58 AM Martijn Visser 
>> wrote:
>>
>>> Hi Bobby,
>>>
>>> That's because Flink 1.14 currently doesn't support the
>>> RecoverableWriter for GCS. This will be supported as of Flink 1.15, which
>>> you can see in the relevant Flink Jira ticket [1]. For more details on why
>>> RecoverableWriter is important, the JavaDoc [2] is probably the best source
>>> of information.
>>>
>>> Best regards,
>>>
>>> Martijn Visser
>>> https://twitter.com/MartijnVisser82
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11838
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/core/fs/RecoverableWriter.html
>>>
>>>
>>> On Mon, 14 Mar 2022 at 15:21, Bobby Richard 
>>> wrote:
>>>
>>>> I am receiving the following exception when attempting to write to GCS
>>>> with the FileSink in FLink 14.3. Using flink hadoop shaded 2.8.3-10.0
>>>> and gcs connector hadoop2-2.1.1.
>>>>
>>>>
>>>> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
>>>> are only supported for HDFS
>>>>
>>>> I am able to write checkpoints and savepoints to GCS successfully.
>>>>
>>>> This electronic communication and the information and any files
>>>> transmitted with it, or attached to it, are confidential and are intended
>>>> solely for the use of the individual or entity to whom it is addressed and
>>>> may contain information that is confidential, legally privileged, protected
>>>> by privacy laws, or otherwise restricted from disclosure to anyone else. If
>>>> you are not the intended recipient or the person responsible for delivering
>>>> the e-mail to the intended recipient, you are hereby notified that any use,
>>>> copying, distributing, dissemination, forwarding, printing, or copying of
>>>> this e-mail is strictly prohibited. If you received this e-mail in error,
>>>> please return the e-mail to the sender, delete it from your computer, and
>>>> destroy any printed copy of it.
>>>
>>>
>> This electronic communication and the information and any files
>> transmitted with it, or attached to it, are confidential and are intended
>> solely for the use of the individual or entity to whom it is addressed and
>> may contain information that is confidential, legally privileged, protected
>> by privacy laws, or otherwise restricted from disclosure to anyone else. If
>> you are not the intended recipient or the person responsible for delivering
>> the e-mail to the intended recipient, you are hereby notified that any use,
>> copying, distributing, dissemination, forwarding, printing, or copying of
>> this e-mail is strictly prohibited. If you received this e-mail in error,
>> please return the e-mail to the sender, delete it from your computer, and
>> destroy any printed copy of it.
>
>

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.


smime.p7s
Description: S/MIME Cryptographic Signature


Re: FileSink to GCS

2022-03-14 Thread Bobby Richard
Thanks Martijn, are there any alternatives to write to GCS using Flink
1.14? Recoverability isn't important for my current use case.

On Mon, Mar 14, 2022 at 10:58 AM Martijn Visser 
wrote:

> Hi Bobby,
>
> That's because Flink 1.14 currently doesn't support the RecoverableWriter
> for GCS. This will be supported as of Flink 1.15, which you can see in the
> relevant Flink Jira ticket [1]. For more details on why RecoverableWriter
> is important, the JavaDoc [2] is probably the best source of information.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1] https://issues.apache.org/jira/browse/FLINK-11838
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/core/fs/RecoverableWriter.html
>
>
> On Mon, 14 Mar 2022 at 15:21, Bobby Richard 
> wrote:
>
>> I am receiving the following exception when attempting to write to GCS
>> with the FileSink in FLink 14.3. Using flink hadoop shaded 2.8.3-10.0
>> and gcs connector hadoop2-2.1.1.
>>
>>
>> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
>> are only supported for HDFS
>>
>> I am able to write checkpoints and savepoints to GCS successfully.
>>
>> This electronic communication and the information and any files
>> transmitted with it, or attached to it, are confidential and are intended
>> solely for the use of the individual or entity to whom it is addressed and
>> may contain information that is confidential, legally privileged, protected
>> by privacy laws, or otherwise restricted from disclosure to anyone else. If
>> you are not the intended recipient or the person responsible for delivering
>> the e-mail to the intended recipient, you are hereby notified that any use,
>> copying, distributing, dissemination, forwarding, printing, or copying of
>> this e-mail is strictly prohibited. If you received this e-mail in error,
>> please return the e-mail to the sender, delete it from your computer, and
>> destroy any printed copy of it.
>
>

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.


smime.p7s
Description: S/MIME Cryptographic Signature


FileSink to GCS

2022-03-14 Thread Bobby Richard
I am receiving the following exception when attempting to write to GCS with
the FileSink in FLink 14.3. Using flink hadoop shaded 2.8.3-10.0 and gcs
connector hadoop2-2.1.1.


java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
only supported for HDFS

I am able to write checkpoints and savepoints to GCS successfully.

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.


smime.p7s
Description: S/MIME Cryptographic Signature


Re: KafkaSource Problem

2021-03-09 Thread Bobby Richard
Great thanks, I was able to work around the issue by implementing my own
KafkaRecordDeserializer. I will take a stab at a PR to fix the bug, should
be an easy fix.

On Tue, Mar 9, 2021 at 9:26 AM Till Rohrmann  wrote:

> Hi Bobby,
>
> This is most likely a bug in Flink. Thanks a lot for reporting the issue
> and analyzing it. I have created an issue for tracking it [1].
>
> cc Becket.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21691
>
> Cheers,
> Till
>
> On Mon, Mar 8, 2021 at 3:35 PM Bobby Richard 
> wrote:
>
>> I'm receiving the following exception when trying to use a KafkaSource
>> from the new DataSource API.
>>
>> Exception in thread "main" java.lang.NullPointerException
>> at
>> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
>> at
>> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
>>
>> Here is my code (kotlin)
>>
>> val kafkaSource = buildKafkaSource(params)
>> val datastream = env.fromSource(kafkaSource, 
>> WatermarkStrategy.noWatermarks(), "kafka")
>>
>> private fun buildKafkaSource(params: ParameterTool): KafkaSource {
>> val builder = KafkaSource.builder()
>> .setBootstrapServers(params.get("bootstrapServers"))
>> .setGroupId(params.get("groupId"))
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setTopics("topic")
>> 
>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
>>
>> if (params.getBoolean("boundedSource", false)) {
>> builder.setBounded(OffsetsInitializer.latest())
>> }
>>
>> return builder.build()
>> }
>>
>>
>>
>>
>> I'm setting the deserializer using the ValueDeserializerWrapper as
>> described in the KafkaSourceBuilder javadoc example
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html
>>
>> Looking at the code for the ValueDeserializerWrapper, it appears that the
>> deserializer isn't actually set until the deserialize method is called, but
>> getProducedType is actually called first resulting in the
>> NullPointerException. What am I missing?
>>
>> Thanks,
>> Bobby
>>
>> This electronic communication and the information and any files
>> transmitted with it, or attached to it, are confidential and are intended
>> solely for the use of the individual or entity to whom it is addressed and
>> may contain information that is confidential, legally privileged, protected
>> by privacy laws, or otherwise restricted from disclosure to anyone else. If
>> you are not the intended recipient or the person responsible for delivering
>> the e-mail to the intended recipient, you are hereby notified that any use,
>> copying, distributing, dissemination, forwarding, printing, or copying of
>> this e-mail is strictly prohibited. If you received this e-mail in error,
>> please return the e-mail to the sender, delete it from your computer, and
>> destroy any printed copy of it.
>
>

-- 

*Bobby Richard*
R Software Engineer   | Information Security Group   | Symantec
Enterprise Division
Broadcom

mobile: 337.794.2128

Atlanta, GA (USA)
bobby.rich...@broadcom.com   | broadcom.com

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.


smime.p7s
Description: S/MIME Cryptographic Signature


KafkaSource Problem

2021-03-08 Thread Bobby Richard
I'm receiving the following exception when trying to use a KafkaSource from
the new DataSource API.

Exception in thread "main" java.lang.NullPointerException
at
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)

Here is my code (kotlin)

val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "kafka")

private fun buildKafkaSource(params: ParameterTool): KafkaSource {
val builder = KafkaSource.builder()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}




I'm setting the deserializer using the ValueDeserializerWrapper as
described in the KafkaSourceBuilder javadoc example
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html

Looking at the code for the ValueDeserializerWrapper, it appears that the
deserializer isn't actually set until the deserialize method is called, but
getProducedType is actually called first resulting in the
NullPointerException. What am I missing?

Thanks,
Bobby

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.


smime.p7s
Description: S/MIME Cryptographic Signature