Kobe cloud you little bit eleborate your idea ?
On Tue, Dec 8, 2020, 6:27 PM Kobe Feng <flllbls...@gmail.com> wrote: > Hi all, > Sorry for the step-in. This case reminds me the similar req. in my company > for plugin lambda func in beam's pipeline dynamically like filtering, > selecting, etc. without restarting the job long time ago, like flink > stateful functions, AKKA, etc. > > Generally, SQL defines input, output, and transformation explicit which > means fix schema and coder usually (using * is arbitrary, nowadays SQL more > change to newSQL due to NoSQL and decouple with storage layer, loosing the > restrictions but for more flexible processing capability) > > So if we want to support schema-free in streaming pipeline natively, could > we consider providing such capability from beam core part too (for higher > transparency and possibly be leveraged by SQL layer too), like the > capability for plugin coder with runtime compatible check with prev ones, > stateful functions (not beam's stateful processing), in-out data with > schema Id for schema-based transform, etc. > > I'm kinder of being away from apache beam for a while, sorry if beam > already had such native support or I misunderstood. > > Thanks! > Kobe Feng > > On Tue, Dec 8, 2020 at 3:15 PM Reuven Lax <re...@google.com> wrote: > >> Talat, are you interested in writing a proposal and sending it to >> d...@beam.apache.org? We could help advise on the options. >> >> Reuven >> >> On Tue, Dec 8, 2020 at 10:28 AM Andrew Pilloud <apill...@google.com> >> wrote: >> >>> We could support EXPECT statements in proposal 2 as long as we >>> restricted it to known fields. >>> >>> We are getting into implementation details now. Making unknown fields >>> just a normal column introduces a number of problems. ZetaSQL doesn't >>> support Map type. All our IOs would need to explicitly deal with that >>> special column. There would be a lack of consistency between the various >>> types (Avro, Proto, Json) which should all support this. >>> >>> We might also want something even more invasive: everything is an >>> unknown field unless it is referenced in the SQL query. All of these >>> options are possible. I guess we need someone who has time to work on it to >>> write a proposal. >>> >>> On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax <re...@google.com> wrote: >>> >>>> I'm not sure that we could support EXCEPT statements, as that would >>>> require introspecting the unknown fields (what if the EXCEPT statement >>>> matches a field that later is added as an unknown field?). IMO this sort of >>>> behavior only makes sense on true pass-through queries. Anything that >>>> modifies the input record would be tricky to support. >>>> >>>> Nested rows would work for proposal 2. You would need to make sure that >>>> the unknown-fields map is recursively added to all nested rows, and you >>>> would do this when you infer a schema from the avro schema. >>>> >>>> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <apill...@google.com> >>>> wrote: >>>> >>>>> Proposal 1 would also interact poorly with SELECT * EXCEPT ... >>>>> statements, which returns all columns except specific ones. Adding an >>>>> unknown field does seem like a reasonable way to handle this. It probably >>>>> needs to be something that is native to the Row type, so columns added to >>>>> nested rows also work. >>>>> >>>>> Andrew >>>>> >>>>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> There's a difference between a fully dynamic schema and simply being >>>>>> able to forward "unknown" fields to the output. >>>>>> >>>>>> A fully-dynamic schema is not really necessary unless we also had >>>>>> dynamic SQL statements. Since the existing SQL statements do not >>>>>> reference >>>>>> the new fields by name, there's no reason to add them to the main schema. >>>>>> >>>>>> However, if you have a SELECT * FROM WHERE XXXX statement that does >>>>>> no aggregation, there's fundamentally no reason we couldn't forward the >>>>>> messages exactly. In theory we could forward the exact bytes that are in >>>>>> the input PCollection, which would necessarily forward the new fields. In >>>>>> practice I believe that we convert the input messages to Beam Row objects >>>>>> in order to evaluate the WHERE clause, and then convert back to Avro to >>>>>> output those messages. I believe this is where we "lose" the unknown >>>>>> messages,but this is an implementation artifact - in theory we could >>>>>> output >>>>>> the original bytes whenever we see a SELECT *. This is not truly a >>>>>> dynamic >>>>>> schema, since you can't really do anything with these extra fields except >>>>>> forward them to your output. >>>>>> >>>>>> I see two possible ways to address this. >>>>>> >>>>>> 1. As I mentioned above, in the case of a SELECT * we could output >>>>>> the original bytes, and only use the Beam Row for evaluating the WHERE >>>>>> clause. This might be very expensive though - we risk having to keep two >>>>>> copies of every message around, one in the original Avro format and one >>>>>> in >>>>>> Row format. >>>>>> >>>>>> 2. The other way would be to do what protocol buffers do. We could >>>>>> add one extra field to the inferred Beam schema to store new, unknown >>>>>> fields (probably this would be a map-valued field). This extra field >>>>>> would >>>>>> simply store the raw bytes of these unknown fields, and then when >>>>>> converting back to Avro they would be added to the output message. This >>>>>> might also add some overhead to the pipeline, so might be best to make >>>>>> this >>>>>> behavior opt in. >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bhule...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Reuven, could you clarify what you have in mind? I know multiple >>>>>>> times we've discussed the possibility of adding update compatibility >>>>>>> support to SchemaCoder, including support for certain schema changes >>>>>>> (field >>>>>>> additions/deletions) - I think the most recent discussion was here [1]. >>>>>>> >>>>>>> But it sounds like Talat is asking for something a little beyond >>>>>>> that, effectively a dynamic schema. Is that something you think we can >>>>>>> support? >>>>>>> >>>>>>> [1] >>>>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E >>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7-2540-253Cdev.beam.apache.org-253E&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Xl4R9N-8xXkH0eYS8Y49EQoBUaQSTRtv7sBjo9XRAOk&s=9wy_ZugJkaLoCzvqO7OVL4LjVLi0WcdWDCEjXEhcn6M&e=> >>>>>>> >>>>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> Thanks. It might be theoretically possible to do this (at least for >>>>>>>> the case where existing fields do not change). Whether anyone >>>>>>>> currently has >>>>>>>> available time to do this is a different question, but it's something >>>>>>>> that >>>>>>>> can be looked into. >>>>>>>> >>>>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer < >>>>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>>>> >>>>>>>>> Adding new fields is more common than modifying existing fields. >>>>>>>>> But type change is also possible for existing fields, such as regular >>>>>>>>> mandatory field(string,integer) to union(nullable field). No field >>>>>>>>> deletion. >>>>>>>>> >>>>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> And when you say schema changes, are these new fields being added >>>>>>>>>> to the schema? Or are you making changes to the existing fields? >>>>>>>>>> >>>>>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer < >>>>>>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> For sure let me explain a little bit about my pipeline. >>>>>>>>>>> My Pipeline is actually simple >>>>>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], >>>>>>>>>>> byte[]>, Row>) -> Apply Filter(SqlTransform.query(sql)) -> >>>>>>>>>>> Convert back from Row to Avro (DoFn<Row, byte[]>)-> Write >>>>>>>>>>> DB/GCS/GRPC etc >>>>>>>>>>> >>>>>>>>>>> On our jobs We have three type sqls >>>>>>>>>>> - SELECT * FROM PCOLLECTION >>>>>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition> >>>>>>>>>>> - SQL Projection with or without Where clause SELECT col1, col2 >>>>>>>>>>> FROM PCOLLECTION >>>>>>>>>>> >>>>>>>>>>> We know writerSchema for each message. While deserializing avro >>>>>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes >>>>>>>>>>> to Beam >>>>>>>>>>> Row step. It always produces a reader schema's generic record and we >>>>>>>>>>> convert that generic record to Row. >>>>>>>>>>> While submitting DF job we use latest schema to generate >>>>>>>>>>> beamSchema. >>>>>>>>>>> >>>>>>>>>>> In the current scenario When we have schema changes first we >>>>>>>>>>> restart all 15k jobs with the latest updated schema then whenever >>>>>>>>>>> we are >>>>>>>>>>> done we turn on the latest schema for writers. Because of Avro's >>>>>>>>>>> GrammerResolver[1] we read different versions of the schema and we >>>>>>>>>>> always >>>>>>>>>>> produce the latest schema's record. Without breaking our pipeline >>>>>>>>>>> we are >>>>>>>>>>> able to handle multiple versions of data in the same streaming >>>>>>>>>>> pipeline. If >>>>>>>>>>> we can generate SQL's java code when we get notified wirth latest >>>>>>>>>>> schema we >>>>>>>>>>> will handle all schema changes. The only remaining obstacle is >>>>>>>>>>> Beam's SQL >>>>>>>>>>> Java code. That's why I am looking for some solution. We dont need >>>>>>>>>>> multiple >>>>>>>>>>> versions of SQL. We only need to regenerate SQL schema with the >>>>>>>>>>> latest >>>>>>>>>>> schema on the fly. >>>>>>>>>>> >>>>>>>>>>> I hope I can explain it :) >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html >>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=> >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Can you explain the use case some more? Are you wanting to >>>>>>>>>>>> change your SQL statement as well when the schema changes? If not, >>>>>>>>>>>> what are >>>>>>>>>>>> those new fields doing in the pipeline? What I mean is that your >>>>>>>>>>>> old SQL >>>>>>>>>>>> statement clearly didn't reference those fields in a SELECT >>>>>>>>>>>> statement since >>>>>>>>>>>> they didn't exist, so what are you missing by not having them >>>>>>>>>>>> unless you >>>>>>>>>>>> are also changing the SQL statement? >>>>>>>>>>>> >>>>>>>>>>>> Is this a case where you have a SELECT *, and just want to make >>>>>>>>>>>> sure those fields are included? >>>>>>>>>>>> >>>>>>>>>>>> Reuven >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer < >>>>>>>>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Andrew, >>>>>>>>>>>>> >>>>>>>>>>>>> I assume SQL query is not going to change. Changing things is >>>>>>>>>>>>> the Row schema by adding new columns or rename columns. if we >>>>>>>>>>>>> keep a >>>>>>>>>>>>> version information on somewhere for example a KV pair. Key is >>>>>>>>>>>>> schema >>>>>>>>>>>>> information, value is Row. Can not we generate SQL code ? Why I >>>>>>>>>>>>> am asking >>>>>>>>>>>>> We have 15k pipelines. When we have a schema change we restart a >>>>>>>>>>>>> 15k DF job >>>>>>>>>>>>> which is pain. I am looking for a possible way to avoid job >>>>>>>>>>>>> restart. Dont >>>>>>>>>>>>> you think it is not still doable ? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud < >>>>>>>>>>>>> apill...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java >>>>>>>>>>>>>> code on the fly, even if we did, that wouldn't solve your >>>>>>>>>>>>>> problem. I >>>>>>>>>>>>>> believe our recommended practice is to run both the old and new >>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>> for some time, then pick a window boundary to transition the >>>>>>>>>>>>>> output from >>>>>>>>>>>>>> the old pipeline to the new one. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Beam doesn't handle changing the format of data sent between >>>>>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to >>>>>>>>>>>>>> serialize >>>>>>>>>>>>>> data between steps of the pipeline. The builtin coders >>>>>>>>>>>>>> (including the >>>>>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't >>>>>>>>>>>>>> handle >>>>>>>>>>>>>> schema evolution. They are optimized for performance at all >>>>>>>>>>>>>> costs. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If you worked around this, the Beam model doesn't support >>>>>>>>>>>>>> changing the structure of the pipeline graph. This would >>>>>>>>>>>>>> significantly >>>>>>>>>>>>>> limit the changes you can make. It would also require some >>>>>>>>>>>>>> changes to SQL >>>>>>>>>>>>>> to try to produce the same plan for an updated SQL query. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Andrew >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer < >>>>>>>>>>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in >>>>>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. >>>>>>>>>>>>>>> Over time the >>>>>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code >>>>>>>>>>>>>>> based on what we >>>>>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have >>>>>>>>>>>>>>> any idea >>>>>>>>>>>>>>> How can we handle schema changes with resubmitting our beam >>>>>>>>>>>>>>> job. Is it >>>>>>>>>>>>>>> possible to generate SQL java code on the fly ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> >>>>>>>>>>>>>> > > -- > Yours Sincerely > Kobe Feng >