Hey AJ,

I’m not familiar with the stock parquet sink, but if it requires a schema on 
creation you won’t be able to change the output schema without restarting the 
job. I’m using a custom sink that can update the schema it uses. The problem 
I’m facing is how to communicate those updates in an efficient way. Currently 
I’m checking every record for differences with the stored schema which is both 
a lot of overhead and creates discrepancies in when partitions update the 
schema they use to write and when a schema can be marked as “in use”.

For your use case, the analogous approach would be replace the Parquet sink 
with a custom sink that managed the lifecycle of the underlying parquet writer 
itself. Then you could control closing the current writer and creating a new 
one with an updated schema yourself and thus do it in code instead of via a 
restart.

Julian

From: aj <ajainje...@gmail.com>
Date: Thursday, October 15, 2020 at 4:12 AM
To: "Jaffe, Julian" <julianja...@activision.com>
Cc: Piotr Nowojski <pnowoj...@apache.org>, user <user@flink.apache.org>
Subject: Re: Broadcasting control messages to a sink

Hi Jaffe,

I am also working on something similar type of a problem.

I am receiving a set of events in Avro format on different topics. I want to 
consume these and write to s3 in parquet format.
I have written a  job that creates a different stream for each event and 
fetches its schema from the confluent schema registry to create a parquet sink 
for an event.
This is working fine but the only problem I am facing is whenever a new event 
start coming or any change in the schema  I have to change in the YAML config 
and restart the job every time. Is there any way I do not have to restart the 
job and it starts consuming a new set of events?

As I see you are handling schema evolution can u help me with this and also how 
can I handle the new events.  In the config, I am keeping a mapping of events 
and schema subjects.  Please share how you solving this.


So currently this is the way I am doing it but wanna know some better way to 
handle it.

FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
        new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
        properties);

        DataStream<GenericRecord> dataStream = 
streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

        try {
        for (EventConfig eventConfig : eventTypesList) {

        LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
        (path, 
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
 registryClient)))
        .withBucketAssigner(new EventTimeBucketAssigner())
        .build();

        DataStream<GenericRecord> outStream = 
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
        if (genericRecord != null && 
genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
        return true;
        }
        return false;
        });
        
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

        }
        } catch (Exception e) {
        e.printStackTrace();
        }

On Wed, Oct 14, 2020, 23:12 Jaffe, Julian 
<julianja...@activision.com<mailto:julianja...@activision.com>> wrote:
Thanks for the suggestion Piotr!

The problem is that the sink needs to have access to the schema (so that it can 
write the schema only once per file instead of record) and thus needs to know 
when the schema has been updated. In this proposed architecture, I think the 
sink would still need to check each record to see if the current schema matches 
the new record or not? The main problem I encountered when playing around with 
broadcast state was that I couldn’t figure out how to access the broadcast 
state within the sink, but perhaps I just haven’t thought about it the right 
way. I’ll meditate on the docs further  🙂

Julian

From: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" 
<julianja...@activision.com<mailto:julianja...@activision.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Broadcasting control messages to a sink

Hi Julian,

Have you seen Broadcast State [1]? I have never used it personally, but it 
sounds like something you want. Maybe your job should look like:

1. read raw messages from Kafka, without using the schema
2. read schema changes and broadcast them to 3. and 5.
3. deserialize kafka records in BroadcastProcessFunction by using combined 1. 
and 2.
4. do your logic o
5. serialize records using schema in another BroadcastProcessFunction by using 
combined 4. and 2.
6. write raw records using BucketingSink
?

Best,
Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=0fL33mv_n-SUiL8AARIrGXmY1d8pdhu4ivDeRjg5f84&s=RjsXnxEVCBz2BGLxe89FU_SpbtfTlRkjsT5J-gbvqFI&e=>

śr., 14 paź 2020 o 11:01 Jaffe, Julian 
<julianja...@activision.com<mailto:julianja...@activision.com>> napisał(a):
Hey all,

I’m building a Flink app that pulls in messages from a Kafka topic and writes 
them out to disk using a custom bucketed sink. Each message needs to be parsed 
using a schema that is also needed when writing in the sink. This schema is 
read from a remote file on a distributed file system (it could also be fetched 
from a service). The schema will be updated very infrequently.

In order to support schema evolution, I have created a custom source that 
occasionally polls for updates and if it finds one parses the new schema and 
sends a message containing the serialized schema. I’ve connected these two 
streams and then use a RichCoFlatMapFunction to flatten them back into a single 
output stream (schema events get used to update the parser, messages get parsed 
using the parser and emitted).

However, I need some way to communicate the updated schema to every task of the 
sink. Simply emitting a control message that is ignored when writing to disk 
means that only one sink partition will receive the message and thus update the 
schema. I thought about sending the control message as side output and then 
broadcasting the resulting stream to the sink alongside the processed event 
input but I couldn’t figure out a way to do so. For now, I’m bundling the 
schema used to parse each event with the event, storing the schema in the sink, 
and then checking every event’s schema against the stored schema but this is 
fairly inefficient. Also, I’d like to eventually increase the types of control 
messages I can send to the sink, some of which may not be idempotent. Is there 
a better way to handle this pattern?


(Bonus question: ideally, I’d like to be able to perform an action when all 
sink partitions have picked up the new schema. I’m not aware of any way to emit 
metadata of this sort from Flink tasks beyond abusing the metrics system. This 
approach still leaves open the possibility of tasks picking up the new schema 
and then crashing for unrelated reasons thus inflating the count of tasks using 
a specific schema and moreover requires tracking at least the current level of 
parallelism and probably also Flink task state outside of Flink. Are there any 
patterns for reporting metadata like this to the job manager?)

I’m using Flink 1.8.

Reply via email to