And when you say schema changes, are these new fields being added to the
schema? Or are you making changes to the existing fields?

On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <tuya...@paloaltonetworks.com>
wrote:

> Hi,
> For sure let me explain a little bit about my pipeline.
> My Pipeline is actually simple
> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from Row
> to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>
> On our jobs We have three type sqls
> - SELECT * FROM PCOLLECTION
> - SELECT * FROM PCOLLECTION <with Where Condition>
> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
> PCOLLECTION
>
> We know writerSchema for each message. While deserializing avro binary we
> use writer schema and reader schema on Convert Avro Bytes to Beam Row step.
> It always produces a reader schema's generic record and we convert that
> generic record to Row.
> While submitting DF job we use latest schema to generate beamSchema.
>
> In the current scenario When we have schema changes first we restart all
> 15k jobs with the latest updated schema then whenever we are done we turn
> on the latest schema for writers. Because of Avro's GrammerResolver[1] we
> read different versions of the schema and we always produce the latest
> schema's record. Without breaking our pipeline we are able to handle
> multiple versions of data in the same streaming pipeline. If we can
> generate SQL's java code when we get notified wirth latest schema we will
> handle all schema changes. The only remaining obstacle is Beam's SQL Java
> code. That's why I am looking for some solution. We dont need multiple
> versions of SQL. We only need to regenerate SQL schema with the latest
> schema on the fly.
>
> I hope I can explain it :)
>
> Thanks
>
> [1]
> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>
> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>
>> Can you explain the use case some more? Are you wanting to change your
>> SQL statement as well when the schema changes? If not, what are those new
>> fields doing in the pipeline? What I mean is that your old SQL statement
>> clearly didn't reference those fields in a SELECT statement since they
>> didn't exist, so what are you missing by not having them unless you are
>> also changing the SQL statement?
>>
>> Is this a case where you have a SELECT *, and just want to make sure
>> those fields are included?
>>
>> Reuven
>>
>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <tuya...@paloaltonetworks.com>
>> wrote:
>>
>>> Hi Andrew,
>>>
>>> I assume SQL query is not going to change. Changing things is the Row
>>> schema by adding new columns or rename columns. if we keep a version
>>> information on somewhere for example a KV pair. Key is schema information,
>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>> it is not still doable ?
>>>
>>> Thanks
>>>
>>>
>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <apill...@google.com>
>>> wrote:
>>>
>>>> Unfortunately we don't have a way to generate the SQL Java code on the
>>>> fly, even if we did, that wouldn't solve your problem. I believe our
>>>> recommended practice is to run both the old and new pipeline for some time,
>>>> then pick a window boundary to transition the output from the old pipeline
>>>> to the new one.
>>>>
>>>> Beam doesn't handle changing the format of data sent between
>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>> data between steps of the pipeline. The builtin coders (including the
>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>> schema evolution. They are optimized for performance at all costs.
>>>>
>>>> If you worked around this, the Beam model doesn't support changing the
>>>> structure of the pipeline graph. This would significantly limit the changes
>>>> you can make. It would also require some changes to SQL to try to produce
>>>> the same plan for an updated SQL query.
>>>>
>>>> Andrew
>>>>
>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>> format. We generate our rows based on our Avro schema. Over time the 
>>>>> schema
>>>>> is changing. I believe Beam SQL generates Java code based on what we 
>>>>> define
>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>>> generate SQL java code on the fly ?
>>>>>
>>>>> Thanks
>>>>>
>>>>

Reply via email to