[jira] [Updated] (FLINK-35808) Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in KafkaSourceBuilder

2024-07-10 Thread Kevin Lam (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Lam updated FLINK-35808:
--
Description: 
This issue is a follow-up to [this mailing list 
discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 

I'd like to propose letting the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 
be overridable by user in KafkaSourceBuilder, as shown in this DRAFT PR:
 
[https://github.com/apache/flink-connector-kafka/pull/108]
 
>From the PR description: 
{quote}This allows users to easily implement the [{{claim check}} large message 
pattern|https://developer.confluent.io/patterns/event-processing/claim-check/] 
without bringing any concerns into the Flink codebase otherwise, by specifying 
a {{value.deserializer}} that handles it, but otherwise passes through the 
bytes.
Note: overriding value.serializer is already supported on the Producer side: 
|[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]|

 
Other Reading:
[https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/]

[https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]
{quote}
 
What do folks think? If it seems reasonable I can follow the steps in the 
[contribution guide|https://flink.apache.org/how-to-contribute/overview/].  

  was:
This issue is a follow-up to [this mailing list 
discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 

I'd like to propose letting the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 
be overridable by user in KafkaSourceBuilder, as shown in this DRAFT PR:
 
[https://github.com/apache/flink-connector-kafka/pull/108]
 
>From the PR description: 
{quote}This allows users to easily implement the [{{claim check}} large message 
pattern|https://developer.confluent.io/patterns/event-processing/claim-check/] 
without bringing any concerns into the Flink codebase otherwise, by specifying 
a {{value.deserializer}} that handles it, but otherwise passes through the 
bytes.
Note: [overriding value.serializer is already supported on the Producer side|
|[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]|

]

 
Other Reading:
[https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
] 
[https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]|
{quote}
 
What do folks think? If it seems reasonable I can follow the steps in the 
[contribution guide|https://flink.apache.org/how-to-contribute/overview/].  


> Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in 
> KafkaSourceBuilder
> ---
>
> Key: FLINK-35808
> URL: https://issues.apache.org/jira/browse/FLINK-35808
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.2.0
>Reporter: Kevin Lam
>Priority: Minor
>
> This issue is a follow-up to [this mailing list 
> discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 
> I'd like to propose letting the 
> ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in 
> KafkaSourceBuilder, as shown in this DRAFT PR:
>  
> [https://github.com/apache/flink-connector-kafka/pull/108]
>  
> From the PR description: 
> {quote}This allows users to easily implement the [{{claim check}} large 
> message 
> pattern|https://developer.confluent.io/patterns/event-processing/claim-check/]
>  without bringing any concerns into the Flink codebase otherwise, by 
> specifying a {{value.deserializer}} that handles it, but otherwise passes 
> through the bytes.
> Note: overriding value.serializer is already supported on the Producer side: 
> |[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]|
>  
> Other Reading:
> [https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/]
> [https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]
> {quote}
>  
> What do folks think? If it seems reasonable I can follow the steps in the 
> [contribution 

[jira] [Updated] (FLINK-35808) Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in KafkaSourceBuilder

2024-07-10 Thread Kevin Lam (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Lam updated FLINK-35808:
--
Description: 
This issue is a follow-up to [this mailing list 
discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 

I'd like to propose letting the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 
be overridable by user in KafkaSourceBuilder, as shown in this DRAFT PR:
 
[https://github.com/apache/flink-connector-kafka/pull/108]
 
>From the PR description: 
{quote}This allows users to easily implement the [{{claim check}} large message 
pattern|https://developer.confluent.io/patterns/event-processing/claim-check/] 
without bringing any concerns into the Flink codebase otherwise, by specifying 
a {{value.deserializer}} that handles it, but otherwise passes through the 
bytes.
Note: [overriding value.serializer is already supported on the Producer side|
|[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]|

]

 
Other Reading:
[https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
] 
[https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]|
{quote}
 
What do folks think? If it seems reasonable I can follow the steps in the 
[contribution guide|https://flink.apache.org/how-to-contribute/overview/].  

  was:
This issue is a follow-up to [this mailing list 
discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 

I'd like to propose letting the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 
be overridable by user in KafkaSourceBuilder, as shown in this DRAFT PR:
 
[https://github.com/apache/flink-connector-kafka/pull/108]
 
>From the PR description: 
{quote}This allows users to easily implement the [{{claim check}} large message 
pattern|https://developer.confluent.io/patterns/event-processing/claim-check/] 
without bringing any concerns into the Flink codebase otherwise, by specifying 
a {{value.deserializer}} that handles it, but otherwise passes through the 
bytes.
Note: [overriding value.serializer is already supported on the Producer side](
|[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83])|


 
Other Reading:
[https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
] 
[https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]|
{quote}
 
What do folks think? If it seems reasonable I can follow the steps in the 
[contribution guide|https://flink.apache.org/how-to-contribute/overview/].  


> Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in 
> KafkaSourceBuilder
> ---
>
> Key: FLINK-35808
> URL: https://issues.apache.org/jira/browse/FLINK-35808
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.2.0
>Reporter: Kevin Lam
>Priority: Minor
>
> This issue is a follow-up to [this mailing list 
> discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 
> I'd like to propose letting the 
> ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in 
> KafkaSourceBuilder, as shown in this DRAFT PR:
>  
> [https://github.com/apache/flink-connector-kafka/pull/108]
>  
> From the PR description: 
> {quote}This allows users to easily implement the [{{claim check}} large 
> message 
> pattern|https://developer.confluent.io/patterns/event-processing/claim-check/]
>  without bringing any concerns into the Flink codebase otherwise, by 
> specifying a {{value.deserializer}} that handles it, but otherwise passes 
> through the bytes.
> Note: [overriding value.serializer is already supported on the Producer side|
> |[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]|
> ]
>  
> Other Reading:
> [https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
> ] 
> [https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]|
> {quote}
>  
> What do folks think? If it seems reasonable I can follow the steps in the 
> [contribution 

[jira] [Updated] (FLINK-35808) Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in KafkaSourceBuilder

2024-07-10 Thread Kevin Lam (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Lam updated FLINK-35808:
--
Description: 
This issue is a follow-up to [this mailing list 
discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 

I'd like to propose letting the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 
be overridable by user in KafkaSourceBuilder, as shown in this DRAFT PR:
 
[https://github.com/apache/flink-connector-kafka/pull/108]
 
>From the PR description: 
{quote}This allows users to easily implement the [{{claim check}} large message 
pattern|https://developer.confluent.io/patterns/event-processing/claim-check/] 
without bringing any concerns into the Flink codebase otherwise, by specifying 
a {{value.deserializer}} that handles it, but otherwise passes through the 
bytes.
Note: [overriding value.serializer is already supported on the Producer side](
|[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83])|


 
Other Reading:
[https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
] 
[https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]|
{quote}
 
What do folks think? If it seems reasonable I can follow the steps in the 
[contribution guide|https://flink.apache.org/how-to-contribute/overview/].  

  was:
This issue is a follow-up to [this mailing list 
discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 

I'd like to propose letting the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 
be overridable by user in KafkaSourceBuilder, as shown in this DRAFT PR:
 
[https://github.com/apache/flink-connector-kafka/pull/108]
 
>From the PR description: 
{quote}This allows users to easily implement the [{{claim check}} large message 
pattern|https://developer.confluent.io/patterns/event-processing/claim-check/] 
without bringing any concerns into the Flink codebase otherwise, by specifying 
a {{value.deserializer}} that handles it, but otherwise passes through the 
bytes.
Note: [overriding {{value.serializer}} is already supported on the Producer 
side.](
|[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83|https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]])|
|
 
Other Reading:
[https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
] 
[https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]|
{quote}
 
What do folks think? If it seems reasonable I can follow the steps in the 
[contribution guide|https://flink.apache.org/how-to-contribute/overview/].  


> Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in 
> KafkaSourceBuilder
> ---
>
> Key: FLINK-35808
> URL: https://issues.apache.org/jira/browse/FLINK-35808
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.2.0
>Reporter: Kevin Lam
>Priority: Minor
>
> This issue is a follow-up to [this mailing list 
> discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 
> I'd like to propose letting the 
> ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in 
> KafkaSourceBuilder, as shown in this DRAFT PR:
>  
> [https://github.com/apache/flink-connector-kafka/pull/108]
>  
> From the PR description: 
> {quote}This allows users to easily implement the [{{claim check}} large 
> message 
> pattern|https://developer.confluent.io/patterns/event-processing/claim-check/]
>  without bringing any concerns into the Flink codebase otherwise, by 
> specifying a {{value.deserializer}} that handles it, but otherwise passes 
> through the bytes.
> Note: [overriding value.serializer is already supported on the Producer side](
> |[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83])|
>  
> Other Reading:
> [https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
> ] 
> 

[jira] [Updated] (FLINK-35808) Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in KafkaSourceBuilder

2024-07-10 Thread Kevin Lam (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Lam updated FLINK-35808:
--
Description: 
This issue is a follow-up to [this mailing list 
discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 

I'd like to propose letting the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 
be overridable by user in KafkaSourceBuilder, as shown in this DRAFT PR:
 
[https://github.com/apache/flink-connector-kafka/pull/108]
 
>From the PR description: 
{quote}This allows users to easily implement the [{{claim check}} large message 
pattern|https://developer.confluent.io/patterns/event-processing/claim-check/] 
without bringing any concerns into the Flink codebase otherwise, by specifying 
a {{value.deserializer}} that handles it, but otherwise passes through the 
bytes.
Note: [overriding {{value.serializer}} is already supported on the Producer 
side.](
|[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83|https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]])|
|
 
Other Reading:
[https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
] 
[https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]|
{quote}
 
What do folks think? If it seems reasonable I can follow the steps in the 
[contribution guide|https://flink.apache.org/how-to-contribute/overview/].  

  was:
This issue is a follow-up to [this mailing list 
discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 

I'd like to propose letting the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 
be overridable by user in KafkaSourceBuilder, as shown in this DRAFT PR:
 
https://github.com/apache/flink-connector-kafka/pull/108
 
>From the PR description: 
{quote}This allows users to easily implement the [{{claim check}} large message 
pattern|https://developer.confluent.io/patterns/event-processing/claim-check/] 
without bringing any concerns into the Flink codebase otherwise, by specifying 
a {{value.deserializer}} that handles it, but otherwise passes through the 
bytes.
Note: [overriding {{value.serializer}} is already supported on the Producer 
side.
|https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]Other
 Reading:
[https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
] 
[https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]{quote}
 
What do folks think? If it seems reasonable I can follow the steps in the 
[contribution guide|https://flink.apache.org/how-to-contribute/overview/].  


> Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in 
> KafkaSourceBuilder
> ---
>
> Key: FLINK-35808
> URL: https://issues.apache.org/jira/browse/FLINK-35808
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.2.0
>Reporter: Kevin Lam
>Priority: Minor
>
> This issue is a follow-up to [this mailing list 
> discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 
> I'd like to propose letting the 
> ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in 
> KafkaSourceBuilder, as shown in this DRAFT PR:
>  
> [https://github.com/apache/flink-connector-kafka/pull/108]
>  
> From the PR description: 
> {quote}This allows users to easily implement the [{{claim check}} large 
> message 
> pattern|https://developer.confluent.io/patterns/event-processing/claim-check/]
>  without bringing any concerns into the Flink codebase otherwise, by 
> specifying a {{value.deserializer}} that handles it, but otherwise passes 
> through the bytes.
> Note: [overriding {{value.serializer}} is already supported on the Producer 
> side.](
> |[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83|https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]])|
> |
>  

[jira] [Created] (FLINK-35808) Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be overridable by user in KafkaSourceBuilder

2024-07-10 Thread Kevin Lam (Jira)
Kevin Lam created FLINK-35808:
-

 Summary: Let ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG be 
overridable by user in KafkaSourceBuilder
 Key: FLINK-35808
 URL: https://issues.apache.org/jira/browse/FLINK-35808
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: kafka-3.2.0
Reporter: Kevin Lam


This issue is a follow-up to [this mailing list 
discussion|https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6]. 

I'd like to propose letting the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG 
be overridable by user in KafkaSourceBuilder, as shown in this DRAFT PR:
 
https://github.com/apache/flink-connector-kafka/pull/108
 
>From the PR description: 
{quote}This allows users to easily implement the [{{claim check}} large message 
pattern|https://developer.confluent.io/patterns/event-processing/claim-check/] 
without bringing any concerns into the Flink codebase otherwise, by specifying 
a {{value.deserializer}} that handles it, but otherwise passes through the 
bytes.
Note: [overriding {{value.serializer}} is already supported on the Producer 
side.
|https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83]Other
 Reading:
[https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
] 
[https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0]{quote}
 
What do folks think? If it seems reasonable I can follow the steps in the 
[contribution guide|https://flink.apache.org/how-to-contribute/overview/].  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31762) Subscribe to multiple Kafka topics may cause partition assignment skew

2024-06-05 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852543#comment-17852543
 ] 

Kevin Lam commented on FLINK-31762:
---

Hi, I'm also experiencing this issue, added some more details in 
[https://lists.apache.org/thread/96qct8n92pqbrsc8h8xq146o5cmjjhd7] before 
finding this issue, your write-up describes it well. 

> Subscribe to multiple Kafka topics may cause partition assignment skew
> --
>
> Key: FLINK-31762
> URL: https://issues.apache.org/jira/browse/FLINK-31762
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0, 1.18.0
>Reporter: Liam
>Priority: Major
> Attachments: image-2023-04-11-08-00-16-054.png, 
> image-2023-04-11-08-12-24-115.png
>
>
> To simplify the demonstration, let us assume that there are two topics, and 
> each topic has four partitions. We have set the parallelism to eight to 
> consume these two topics. However, the current partition assignment method 
> may lead to some subtasks being assigned two partitions while others are left 
> with none.
> !image-2023-04-11-08-00-16-054.png|width=500,height=143!
> In my case, the situation is even worse as I have ten topics, each with 100 
> partitions. If I set the parallelism to 1000, some slots may be assigned 
> seven partitions while others remain unassigned.
> To address this issue, I propose a new partition assignment solution. In this 
> approach, round-robin assignment takes place between all topics, not just one.
> For example, the ideal assignment for the case mentioned above is presented 
> below:
>  
> !https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!
> This new solution can also handle cases where each topic has more partitions.
> !image-2023-04-11-08-12-24-115.png|width=444,height=127!
> Let us work together to reach a consensus on this proposal. Thank you!
>  
> FYI: how the partition be assigned currently
> {code:java}
> public class KafkaTopicPartitionAssigner {    
>     public static int assign(KafkaTopicPartition partition, int 
> numParallelSubtasks) {
>         return assign(partition.getTopic(), partition.getPartition(), 
> numParallelSubtasks);
>     }    public static int assign(String topic, int partition, int 
> numParallelSubtasks) {
>         int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % 
> numParallelSubtasks;        // here, the assumption is that the id of Kafka 
> partitions are always ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + partition) % numParallelSubtasks;
>     }
>  {code}
> for Kafka Source, it's implemented in the KafkaSourceEnumerator as below
> {code:java}
>     static int getSplitOwner(TopicPartition tp, int numReaders) {
>         int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFF) % 
> numReaders;        // here, the assumption is that the id of Kafka partitions 
> are always ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + tp.partition()) % numReaders;
>     } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2024-03-11 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825409#comment-17825409
 ] 

Kevin Lam commented on FLINK-31860:
---

Thanks [~gyfora]! Is there any consideration for putting a solution in place in 
the Flink Kubernetes Operator, if there will not be one from the kubernetes / 
josdk side? 

> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Jayme Howard
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      appender.console.type = CONSOLE  
>     appender.console.layout.type = PatternLayout      
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      # Log all infos in the given rolling file      
> appender.rolling.name = RollingFileAppender      appender.rolling.type = 
> RollingFile      appender.rolling.append = false      
> appender.rolling.fileName = ${sys:log.file}      appender.rolling.filePattern 
> = ${sys:log.file}.%i      appender.rolling.layout.type 

[jira] [Commented] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2024-03-06 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824083#comment-17824083
 ] 

Kevin Lam commented on FLINK-31860:
---

Hi, any updates on this issue? 

It seems that [https://github.com/kubernetes/kubernetes/issues/115070] was 
closed without any changes made.

> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Jayme Howard
>Priority: Blocker
>  Labels: pull-request-available, stale-assigned
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      appender.console.type = CONSOLE  
>     appender.console.layout.type = PatternLayout      
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      # Log all infos in the given rolling file      
> appender.rolling.name = RollingFileAppender      appender.rolling.type = 
> RollingFile      appender.rolling.append = false      
> appender.rolling.fileName = ${sys:log.file}      appender.rolling.filePattern 
> = ${sys:log.file}.%i      appender.rolling.layout.type = PatternLayout      
> 

[jira] [Comment Edited] (FLINK-34440) Support Debezium Protobuf Confluent Format

2024-02-13 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817076#comment-17817076
 ] 

Kevin Lam edited comment on FLINK-34440 at 2/13/24 5:00 PM:


Hello! I just added this Jira issue, following the [Contribution 
Guide.|https://flink.apache.org/how-to-contribute/contribute-code/]

If this ticket warrants a dev@ discussion, I'm happy to open one, please let me 
know.

Also, I am happy to work on contributing the code to complete this issue.

Looking forward to hearing others' thoughts!


was (Author: klam-shop):
I just added this Jira issue, following the [Contribution 
Guide.|https://flink.apache.org/how-to-contribute/contribute-code/]

If this ticket warrants a dev@ discussion, I'm happy to open one.

I am happy to work on contributing the code to complete this issue.

Looking forward to hearing others' thoughts!

> Support Debezium Protobuf Confluent Format
> --
>
> Key: FLINK-34440
> URL: https://issues.apache.org/jira/browse/FLINK-34440
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Kevin Lam
>Priority: Minor
>
> *Motivation*
> Debezium and the Confluent Schema registry can be used to emit Protobuf 
> Encoded messages to Kafka, but Flink does not easily support consuming these 
> messages through a connector.
> *Definition of Done*
> Add a format `debezium-protobuf-confluent` provided by 
> DebeziumProtobufFormatFactory that supports Debezium messages encoded using 
> Protocol Buffer and the Confluent Schema Registry. 
> To consider
>  * Mirror the implementation of the `debezium-avro-confluent` format. First 
> implement a `protobuf-confluent` format similar to the existing [Confluent 
> Avro|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/formats/avro-confluent/]
>  format that's provided today, which allows reading/writing protobuf using 
> the Confluent Schema Registry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34440) Support Debezium Protobuf Confluent Format

2024-02-13 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817076#comment-17817076
 ] 

Kevin Lam commented on FLINK-34440:
---

I just added this Jira issue, following the [Contribution 
Guide.|https://flink.apache.org/how-to-contribute/contribute-code/]

If this ticket warrants a dev@ discussion, I'm happy to open one.

I am happy to work on contributing the code to complete this issue.

Looking forward to hearing others' thoughts!

> Support Debezium Protobuf Confluent Format
> --
>
> Key: FLINK-34440
> URL: https://issues.apache.org/jira/browse/FLINK-34440
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Kevin Lam
>Priority: Minor
>
> *Motivation*
> Debezium and the Confluent Schema registry can be used to emit Protobuf 
> Encoded messages to Kafka, but Flink does not easily support consuming these 
> messages through a connector.
> *Definition of Done*
> Add a format `debezium-protobuf-confluent` provided by 
> DebeziumProtobufFormatFactory that supports Debezium messages encoded using 
> Protocol Buffer and the Confluent Schema Registry. 
> To consider
>  * Mirror the implementation of the `debezium-avro-confluent` format. First 
> implement a `protobuf-confluent` format similar to the existing [Confluent 
> Avro|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/formats/avro-confluent/]
>  format that's provided today, which allows reading/writing protobuf using 
> the Confluent Schema Registry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34440) Support Debezium Protobuf Confluent Format

2024-02-13 Thread Kevin Lam (Jira)
Kevin Lam created FLINK-34440:
-

 Summary: Support Debezium Protobuf Confluent Format
 Key: FLINK-34440
 URL: https://issues.apache.org/jira/browse/FLINK-34440
 Project: Flink
  Issue Type: New Feature
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.18.1, 1.19.0
Reporter: Kevin Lam


*Motivation*

Debezium and the Confluent Schema registry can be used to emit Protobuf Encoded 
messages to Kafka, but Flink does not easily support consuming these messages 
through a connector.

*Definition of Done*

Add a format `debezium-protobuf-confluent` provided by 
DebeziumProtobufFormatFactory that supports Debezium messages encoded using 
Protocol Buffer and the Confluent Schema Registry. 

To consider
 * Mirror the implementation of the `debezium-avro-confluent` format. First 
implement a `protobuf-confluent` format similar to the existing [Confluent 
Avro|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/formats/avro-confluent/]
 format that's provided today, which allows reading/writing protobuf using the 
Confluent Schema Registry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24257) Using Stateful Blue/Green Deployment to handle Application Evolvement

2023-05-24 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725851#comment-17725851
 ] 

Kevin Lam commented on FLINK-24257:
---

Hi! Interested in this, and blue-green deployments generally for the Flink 
Operator. Has there been any recent work or discussions?

> Using Stateful Blue/Green Deployment to handle Application Evolvement
> -
>
> Key: FLINK-24257
> URL: https://issues.apache.org/jira/browse/FLINK-24257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zhihao Wang
>Priority: Major
>
> Application Evolvement is an open problem in streaming processing. We present 
> a *stateful blue/green deployment* to address it. To our knowledge, this is 
> the first general way to handle streaming application evolvement.
> Here is the design doc: 
> [https://docs.google.com/document/d/1QCnhtrgcMRUWCw_SO7vmfj6dyzbdjIAxWkJzKK4d508|https://docs.google.com/document/d/1QCnhtrgcMRUWCw_SO7vmfj6dyzbdjIAxWkJzKK4d508/edit?usp=sharing]
>  
> The design requires a backfill requirement supported by Flink. Please help to 
> review the design and to see Flink community can support such a feature. 
>  
> P.S. [A related ticket|https://issues.apache.org/jira/browse/KAFKA-13291] is 
> also sent to Kafka community for suggestions.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25416) Build unified Parquet BulkFormat for both Table API and DataStream API

2022-03-10 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504393#comment-17504393
 ] 

Kevin Lam commented on FLINK-25416:
---

Hi Jing Ge! Thanks for the quick response. 

As for our requirements, I think these are relevant:
 # BulkFormat supports splitting, so that large parquet files can be handled 
easily. I see AvroParquetRecordFormat currently does not support this.
 # BulkFormat can be extended to return T where T is a Scala case class, in 
addition to GenericRecord. The reason for this is we have other Kafka based 
Sources that emit case classes and we want to be able to combine our Parquet 
FileSources with them using HybridSource.
 ## Put another way, it should be easy to extend the BulkFormat to emit other 
types.

I think those are the main ones for now, I will comment again if I think of 
anything else. 

> Build unified Parquet BulkFormat for both Table API and DataStream API
> --
>
> Key: FLINK-25416
> URL: https://issues.apache.org/jira/browse/FLINK-25416
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Major
>  Labels: Umbrella
>
> *Background information*
> Current AvroParquet implementation AvroParquetRecordFormat uses the high 
> level API ParquetReader that does not provide offset information, which turns 
> out the restoreReader logic has big room to improve.
> Beyond AvroParquetRecordFormat there is another format implementation 
> ParquetVectorizedInputFormat w.r.t. the parquet which is coupled tightly with 
> the Table API.
> It would be better to provide an unified Parquet BulkFormat with one 
> implementation that can support both Table API and DataStream API.
>  
> *Some thoughts*
> Use the low level API {{ParquetFileReader}} with {{BulkFormat}} directly like 
> 'ParquetVectorizedInputFormat' did instead of with {{StreamFormat}} for the 
> following reasons:
>  * the read logic is built in the internal low level class 
> {{InternalParquetRecordReader}} with package private visibility in 
> parquet-hadoop lib which uses another low level class {{ParquetFileReader}} 
> internally. This makes the implementation of StreamFormat very complicated. I 
> think the design idea of StreamFormat is to simplify the implementation. They 
> do not seem to work together.
>  * {{{}ParquetFileReader{}}}reads data in batch mode, i.e. {{{}PageReadStore 
> pages = reader.readNextFilteredRowGroup();{}}}. If we build these logic into 
> StreamFormat({{{}AvroParquetRecordFormat{}}} in this case), 
> {{AvroParquetRecordFormat}} has to take over the role 
> {{InternalParquetRecordReader}} does, including but not limited to
>  # 
>  ## read {{PageReadStore}} in batch mode.
>  ## manage {{{}PageReadStore{}}}, i.e. read next page when all records in the 
> current page have been consumed and cache it.
>  ## manage the read index within the current {{PageReadStore}} because 
> StreamFormat has its own setting for read size, etc.
> All of these make {{AvroParquetRecordFormat}} become the {{BulkFormat}} 
> instead of {{StreamFormat}}
>  * {{StreamFormat}} can only be used via {{{}StreamFormatAdapter{}}}, which 
> means everything we will do with the low level APIs for parquet-hadoop lib 
> should have no conflict with the built-in logic provided by 
> {{{}StreamFormatAdapter{}}}.
> Now we could see if we build these logics into a {{StreamFormat}} 
> implementation, i.e. {{{}AvroParquetRecordFormat{}}}, all convenient built-in 
> logic provided by the {{StreamFormatAdapter}} turns into obstacles. There is 
> also a violation of single responsibility principle, i.e. 
> {{{}AvroParquetRecordFormat }}will take some responsibility of 
> \{{{}BulkFormat{}}}. These might be the reasons why 
> 'ParquetVectorizedInputFormat' implemented {{BulkFormat}} instead of 
> {{{}StreamFormat{}}}.
> In order to build a unified parquet implementation for both Table API and 
> DataStream API, it makes more sense to consider building these code into a 
> {{BulkFormat}} implementation class. Since the output data types are 
> different, {{RowData}} vs. {{{}Avro{}}}, extra converter logic should be 
> introduced into the architecture design. Depending on how complicated the 
> issue will be and how big the impact it will have on the current code base, a 
> new FLIP might be required. 
> Following code piece were suggested by Arvid Heise for the next optimized 
> AvroParquetReader:
> {code:java}
> // Injected
> GenericData model = GenericData.get();
> org.apache.hadoop.conf.Configuration conf = new 
> org.apache.hadoop.conf.Configuration();
> // Low level reader - fetch metadata
> ParquetFileReader reader 

[jira] [Commented] (FLINK-21406) Add AvroParquetFileRecordFormat

2022-03-10 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504379#comment-17504379
 ] 

Kevin Lam commented on FLINK-21406:
---

Hi, is there a ticket for someone to implement splitting functionality for 
AvroParquetFileRecordFormat? And if so, is someone working on it? 

> Add AvroParquetFileRecordFormat
> ---
>
> Key: FLINK-21406
> URL: https://issues.apache.org/jira/browse/FLINK-21406
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Chesnay Schepler
>Assignee: Jing Ge
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
>
> There is currently no easy way to read avro GenericRecords from parquet via 
> the new {{FileSource}}.
> While helping out a user I started writing FileRecordFormat that could do 
> that, but it requires some finalization.
> The implementation is similar to our ParquetAvroWriters class, in that we 
> just wrap some parquet classes and bridge our FileSystem with their IO 
> abstraction.
> The main goal was to have a format that reads data through our FileSystems, 
> and not work directly against Hadoop to prevent a ClassLoader leak from the 
> S3AFileSystem (threads in a thread pool can keep references to the user 
> classloader).
> According to the user it appears to be working, but it will need some 
> cleanup, ideally support for specific records, support for checkpointing 
> (which should be fairly easy I believe), maybe splitting files (not sure 
> whether this works properly with Parquet) and of course tests + documentation.
> {code}
> public class ParquetAvroFileRecordFormat implements 
> FileRecordFormat {
> private final transient Schema schema;
> public ParquetAvroFileRecordFormat(Schema schema) {
> this.schema = schema;
> }
> @Override
> public Reader createReader(
> Configuration config, Path filePath, long splitOffset, long 
> splitLength)
> throws IOException {
> final FileSystem fs = filePath.getFileSystem();
> final FileStatus status = fs.getFileStatus(filePath);
> final FSDataInputStream in = fs.open(filePath);
> return new MyReader(
> AvroParquetReader.builder(new 
> InputFileWrapper(in, status.getLen()))
> .withDataModel(GenericData.get())
> .build());
> }
> @Override
> public Reader restoreReader(
> Configuration config,
> Path filePath,
> long restoredOffset,
> long splitOffset,
> long splitLength) {
> // not called if checkpointing isn't used
> return null;
> }
> @Override
> public boolean isSplittable() {
> // let's not worry about this for now
> return false;
> }
> @Override
> public TypeInformation getProducedType() {
> return new GenericRecordAvroTypeInfo(schema);
> }
> private static class MyReader implements 
> FileRecordFormat.Reader {
> private final ParquetReader parquetReader;
> private MyReader(ParquetReader parquetReader) {
> this.parquetReader = parquetReader;
> }
> @Nullable
> @Override
> public GenericRecord read() throws IOException {
> return parquetReader.read();
> }
> @Override
> public void close() throws IOException {
> parquetReader.close();
> }
> }
> private static class InputFileWrapper implements InputFile {
> private final FSDataInputStream inputStream;
> private final long length;
> private InputFileWrapper(FSDataInputStream inputStream, long length) {
> this.inputStream = inputStream;
> this.length = length;
> }
> @Override
> public long getLength() {
> return length;
> }
> @Override
> public SeekableInputStream newStream() {
> return new SeekableInputStreamAdapter(inputStream);
> }
> }
> private static class SeekableInputStreamAdapter extends 
> DelegatingSeekableInputStream {
> private final FSDataInputStream inputStream;
> private SeekableInputStreamAdapter(FSDataInputStream inputStream) {
> super(inputStream);
> this.inputStream = inputStream;
> }
> @Override
> public long getPos() throws IOException {
> return inputStream.getPos();
> }
> @Override
> public void seek(long newPos) throws IOException {
> inputStream.seek(newPos);
> }
> }
> }
> {code}



--
This message was sent by Atlassian Jira

[jira] [Commented] (FLINK-25416) Build unified Parquet BulkFormat for both Table API and DataStream API

2022-03-09 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503710#comment-17503710
 ] 

Kevin Lam commented on FLINK-25416:
---

Hi, has there been any updates to this issue? I'm interested in using the new 
Source API with Parquet and the Datastream API

> Build unified Parquet BulkFormat for both Table API and DataStream API
> --
>
> Key: FLINK-25416
> URL: https://issues.apache.org/jira/browse/FLINK-25416
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Major
>  Labels: Umbrella
>
> *Background information*
> Current AvroParquet implementation AvroParquetRecordFormat uses the high 
> level API ParquetReader that does not provide offset information, which turns 
> out the restoreReader logic has big room to improve.
> Beyond AvroParquetRecordFormat there is another format implementation 
> ParquetVectorizedInputFormat w.r.t. the parquet which is coupled tightly with 
> the Table API.
> It would be better to provide an unified Parquet BulkFormat with one 
> implementation that can support both Table API and DataStream API.
>  
> *Some thoughts*
> Use the low level API {{ParquetFileReader}} with {{BulkFormat}} directly like 
> 'ParquetVectorizedInputFormat' did instead of with {{StreamFormat}} for the 
> following reasons:
>  * the read logic is built in the internal low level class 
> {{InternalParquetRecordReader}} with package private visibility in 
> parquet-hadoop lib which uses another low level class {{ParquetFileReader}} 
> internally. This makes the implementation of StreamFormat very complicated. I 
> think the design idea of StreamFormat is to simplify the implementation. They 
> do not seem to work together.
>  * {{{}ParquetFileReader{}}}reads data in batch mode, i.e. {{{}PageReadStore 
> pages = reader.readNextFilteredRowGroup();{}}}. If we build these logic into 
> StreamFormat({{{}AvroParquetRecordFormat{}}} in this case), 
> {{AvroParquetRecordFormat}} has to take over the role 
> {{InternalParquetRecordReader}} does, including but not limited to
>  # 
>  ## read {{PageReadStore}} in batch mode.
>  ## manage {{{}PageReadStore{}}}, i.e. read next page when all records in the 
> current page have been consumed and cache it.
>  ## manage the read index within the current {{PageReadStore}} because 
> StreamFormat has its own setting for read size, etc.
> All of these make {{AvroParquetRecordFormat}} become the {{BulkFormat}} 
> instead of {{StreamFormat}}
>  * {{StreamFormat}} can only be used via {{{}StreamFormatAdapter{}}}, which 
> means everything we will do with the low level APIs for parquet-hadoop lib 
> should have no conflict with the built-in logic provided by 
> {{{}StreamFormatAdapter{}}}.
> Now we could see if we build these logics into a {{StreamFormat}} 
> implementation, i.e. {{{}AvroParquetRecordFormat{}}}, all convenient built-in 
> logic provided by the {{StreamFormatAdapter}} turns into obstacles. There is 
> also a violation of single responsibility principle, i.e. 
> {{{}AvroParquetRecordFormat }}will take some responsibility of 
> \{{{}BulkFormat{}}}. These might be the reasons why 
> 'ParquetVectorizedInputFormat' implemented {{BulkFormat}} instead of 
> {{{}StreamFormat{}}}.
> In order to build a unified parquet implementation for both Table API and 
> DataStream API, it makes more sense to consider building these code into a 
> {{BulkFormat}} implementation class. Since the output data types are 
> different, {{RowData}} vs. {{{}Avro{}}}, extra converter logic should be 
> introduced into the architecture design. Depending on how complicated the 
> issue will be and how big the impact it will have on the current code base, a 
> new FLIP might be required. 
> Following code piece were suggested by Arvid Heise for the next optimized 
> AvroParquetReader:
> {code:java}
> // Injected
> GenericData model = GenericData.get();
> org.apache.hadoop.conf.Configuration conf = new 
> org.apache.hadoop.conf.Configuration();
> // Low level reader - fetch metadata
> ParquetFileReader reader = null;
> MessageType fileSchema = reader.getFileMetaData().getSchema();
> Map metaData = 
> reader.getFileMetaData().getKeyValueMetaData();
> // init Avro specific things
> AvroReadSupport readSupport = new AvroReadSupport<>(model);
> ReadSupport.ReadContext readContext =
> readSupport.init(
> new InitContext(
>   conf,
> metaData.entrySet().stream()
>