And when you say schema changes, are these new fields being added to the schema? Or are you making changes to the existing fields?
On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <tuya...@paloaltonetworks.com> wrote: > Hi, > For sure let me explain a little bit about my pipeline. > My Pipeline is actually simple > Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>, > Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from Row > to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc > > On our jobs We have three type sqls > - SELECT * FROM PCOLLECTION > - SELECT * FROM PCOLLECTION <with Where Condition> > - SQL Projection with or without Where clause SELECT col1, col2 FROM > PCOLLECTION > > We know writerSchema for each message. While deserializing avro binary we > use writer schema and reader schema on Convert Avro Bytes to Beam Row step. > It always produces a reader schema's generic record and we convert that > generic record to Row. > While submitting DF job we use latest schema to generate beamSchema. > > In the current scenario When we have schema changes first we restart all > 15k jobs with the latest updated schema then whenever we are done we turn > on the latest schema for writers. Because of Avro's GrammerResolver[1] we > read different versions of the schema and we always produce the latest > schema's record. Without breaking our pipeline we are able to handle > multiple versions of data in the same streaming pipeline. If we can > generate SQL's java code when we get notified wirth latest schema we will > handle all schema changes. The only remaining obstacle is Beam's SQL Java > code. That's why I am looking for some solution. We dont need multiple > versions of SQL. We only need to regenerate SQL schema with the latest > schema on the fly. > > I hope I can explain it :) > > Thanks > > [1] > https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html > > On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote: > >> Can you explain the use case some more? Are you wanting to change your >> SQL statement as well when the schema changes? If not, what are those new >> fields doing in the pipeline? What I mean is that your old SQL statement >> clearly didn't reference those fields in a SELECT statement since they >> didn't exist, so what are you missing by not having them unless you are >> also changing the SQL statement? >> >> Is this a case where you have a SELECT *, and just want to make sure >> those fields are included? >> >> Reuven >> >> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <tuya...@paloaltonetworks.com> >> wrote: >> >>> Hi Andrew, >>> >>> I assume SQL query is not going to change. Changing things is the Row >>> schema by adding new columns or rename columns. if we keep a version >>> information on somewhere for example a KV pair. Key is schema information, >>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k >>> pipelines. When we have a schema change we restart a 15k DF job which is >>> pain. I am looking for a possible way to avoid job restart. Dont you think >>> it is not still doable ? >>> >>> Thanks >>> >>> >>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <apill...@google.com> >>> wrote: >>> >>>> Unfortunately we don't have a way to generate the SQL Java code on the >>>> fly, even if we did, that wouldn't solve your problem. I believe our >>>> recommended practice is to run both the old and new pipeline for some time, >>>> then pick a window boundary to transition the output from the old pipeline >>>> to the new one. >>>> >>>> Beam doesn't handle changing the format of data sent between >>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize >>>> data between steps of the pipeline. The builtin coders (including the >>>> Schema Row Coder used by SQL) have a fixed data format and don't handle >>>> schema evolution. They are optimized for performance at all costs. >>>> >>>> If you worked around this, the Beam model doesn't support changing the >>>> structure of the pipeline graph. This would significantly limit the changes >>>> you can make. It would also require some changes to SQL to try to produce >>>> the same plan for an updated SQL query. >>>> >>>> Andrew >>>> >>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer < >>>> tuya...@paloaltonetworks.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> We are using Beamsql on our pipeline. Our Data is written in Avro >>>>> format. We generate our rows based on our Avro schema. Over time the >>>>> schema >>>>> is changing. I believe Beam SQL generates Java code based on what we >>>>> define >>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can >>>>> we handle schema changes with resubmitting our beam job. Is it possible to >>>>> generate SQL java code on the fly ? >>>>> >>>>> Thanks >>>>> >>>>