Kobe cloud you little bit eleborate your idea ?

On Tue, Dec 8, 2020, 6:27 PM Kobe Feng <flllbls...@gmail.com> wrote:

> Hi all,
> Sorry for the step-in. This case reminds me the similar req. in my company
> for plugin lambda func in beam's pipeline dynamically like filtering,
> selecting, etc. without restarting the job long time ago, like flink
> stateful functions, AKKA, etc.
>
> Generally, SQL defines input, output, and transformation explicit which
> means fix schema and coder usually (using * is arbitrary, nowadays SQL more
> change to newSQL due to NoSQL and decouple with storage layer, loosing the
> restrictions but for more flexible processing capability)
>
> So if we want to support schema-free in streaming pipeline natively, could
> we consider providing such capability from beam core part too (for higher
> transparency and possibly be leveraged by SQL layer too), like the
> capability for plugin coder with runtime compatible check with prev ones,
> stateful functions (not beam's stateful processing), in-out data with
> schema Id for schema-based transform, etc.
>
> I'm kinder of being away from apache beam for a while, sorry if beam
> already had such native support or I misunderstood.
>
> Thanks!
> Kobe Feng
>
> On Tue, Dec 8, 2020 at 3:15 PM Reuven Lax <re...@google.com> wrote:
>
>> Talat, are you interested in writing a proposal and sending it to
>> d...@beam.apache.org? We could help advise on the options.
>>
>> Reuven
>>
>> On Tue, Dec 8, 2020 at 10:28 AM Andrew Pilloud <apill...@google.com>
>> wrote:
>>
>>> We could support EXPECT statements in proposal 2 as long as we
>>> restricted it to known fields.
>>>
>>> We are getting into implementation details now. Making unknown fields
>>> just a normal column introduces a number of problems. ZetaSQL doesn't
>>> support Map type. All our IOs would need to explicitly deal with that
>>> special column. There would be a lack of consistency between the various
>>> types (Avro, Proto, Json) which should all support this.
>>>
>>> We might also want something even more invasive: everything is an
>>> unknown field unless it is referenced in the SQL query. All of these
>>> options are possible. I guess we need someone who has time to work on it to
>>> write a proposal.
>>>
>>> On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I'm not sure that we could support EXCEPT statements, as that would
>>>> require introspecting the unknown fields (what if the EXCEPT statement
>>>> matches a field that later is added as an unknown field?). IMO this sort of
>>>> behavior only makes sense on true pass-through queries. Anything that
>>>> modifies the input record would be tricky to support.
>>>>
>>>> Nested rows would work for proposal 2. You would need to make sure that
>>>> the unknown-fields map is recursively added to all nested rows, and you
>>>> would do this when you infer a schema from the avro schema.
>>>>
>>>> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <apill...@google.com>
>>>> wrote:
>>>>
>>>>> Proposal 1 would also interact poorly with SELECT * EXCEPT ...
>>>>> statements, which returns all columns except specific ones. Adding an
>>>>> unknown field does seem like a reasonable way to handle this. It probably
>>>>> needs to be something that is native to the Row type, so columns added to
>>>>> nested rows also work.
>>>>>
>>>>> Andrew
>>>>>
>>>>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> There's a difference between a fully dynamic schema and simply being
>>>>>> able to forward "unknown" fields to the output.
>>>>>>
>>>>>> A fully-dynamic schema is not really necessary unless we also had
>>>>>> dynamic SQL statements. Since the existing SQL statements do not 
>>>>>> reference
>>>>>> the new fields by name, there's no reason to add them to the main schema.
>>>>>>
>>>>>> However, if you have a SELECT * FROM WHERE XXXX statement that does
>>>>>> no aggregation, there's fundamentally no reason we couldn't forward the
>>>>>> messages exactly. In theory we could forward the exact bytes that are in
>>>>>> the input PCollection, which would necessarily forward the new fields. In
>>>>>> practice I believe that we convert the input messages to Beam Row objects
>>>>>> in order to evaluate the WHERE clause, and then convert back to Avro to
>>>>>> output those messages. I believe this is where we "lose" the unknown
>>>>>> messages,but this is an implementation artifact - in theory we could 
>>>>>> output
>>>>>> the original bytes whenever we see a SELECT *. This is not truly a 
>>>>>> dynamic
>>>>>> schema, since you can't really do anything with these extra fields except
>>>>>> forward them to your output.
>>>>>>
>>>>>> I see two possible ways to address this.
>>>>>>
>>>>>> 1. As I mentioned above, in the case of a SELECT * we could output
>>>>>> the original bytes, and only use the Beam Row for evaluating the WHERE
>>>>>> clause. This might be very expensive though - we risk having to keep two
>>>>>> copies of every message around, one in the original Avro format and one 
>>>>>> in
>>>>>> Row format.
>>>>>>
>>>>>> 2. The other way would be to do what protocol buffers do. We could
>>>>>> add one extra field to the inferred Beam schema to store new, unknown
>>>>>> fields (probably this would be a map-valued field). This extra field 
>>>>>> would
>>>>>> simply store the raw bytes of these unknown fields, and then when
>>>>>> converting back to Avro they would be added to the output message. This
>>>>>> might also add some overhead to the pipeline, so might be best to make 
>>>>>> this
>>>>>> behavior opt in.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bhule...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Reuven, could you clarify what you have in mind? I know multiple
>>>>>>> times we've discussed the possibility of adding update compatibility
>>>>>>> support to SchemaCoder, including support for certain schema changes 
>>>>>>> (field
>>>>>>> additions/deletions) - I think the most recent discussion was here [1].
>>>>>>>
>>>>>>> But it sounds like Talat is asking for something a little beyond
>>>>>>> that, effectively a dynamic schema. Is that something you think we can
>>>>>>> support?
>>>>>>>
>>>>>>> [1]
>>>>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7-2540-253Cdev.beam.apache.org-253E&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Xl4R9N-8xXkH0eYS8Y49EQoBUaQSTRtv7sBjo9XRAOk&s=9wy_ZugJkaLoCzvqO7OVL4LjVLi0WcdWDCEjXEhcn6M&e=>
>>>>>>>
>>>>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Thanks. It might be theoretically possible to do this (at least for
>>>>>>>> the case where existing fields do not change). Whether anyone 
>>>>>>>> currently has
>>>>>>>> available time to do this is a different question, but it's something 
>>>>>>>> that
>>>>>>>> can be looked into.
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>
>>>>>>>>> Adding new fields is more common than modifying existing fields.
>>>>>>>>> But type change is also possible for existing fields, such as regular
>>>>>>>>> mandatory field(string,integer) to union(nullable field). No field
>>>>>>>>> deletion.
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> And when you say schema changes, are these new fields being added
>>>>>>>>>> to the schema? Or are you making changes to the existing fields?
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>>>>>>> My Pipeline is actually simple
>>>>>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[],
>>>>>>>>>>> byte[]>, Row>) -> Apply Filter(SqlTransform.query(sql)) ->
>>>>>>>>>>> Convert back from Row to Avro (DoFn<Row, byte[]>)-> Write
>>>>>>>>>>> DB/GCS/GRPC etc
>>>>>>>>>>>
>>>>>>>>>>> On our jobs We have three type sqls
>>>>>>>>>>> - SELECT * FROM PCOLLECTION
>>>>>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>>>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2
>>>>>>>>>>> FROM PCOLLECTION
>>>>>>>>>>>
>>>>>>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes 
>>>>>>>>>>> to Beam
>>>>>>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>>>>>>> convert that generic record to Row.
>>>>>>>>>>> While submitting DF job we use latest schema to generate
>>>>>>>>>>> beamSchema.
>>>>>>>>>>>
>>>>>>>>>>> In the current scenario When we have schema changes first we
>>>>>>>>>>> restart all 15k jobs with the latest updated schema then whenever 
>>>>>>>>>>> we are
>>>>>>>>>>> done we turn on the latest schema for writers. Because of Avro's
>>>>>>>>>>> GrammerResolver[1] we read different versions of the schema and we 
>>>>>>>>>>> always
>>>>>>>>>>> produce the latest schema's record. Without breaking our pipeline 
>>>>>>>>>>> we are
>>>>>>>>>>> able to handle multiple versions of data in the same streaming 
>>>>>>>>>>> pipeline. If
>>>>>>>>>>> we can generate SQL's java code when we get notified wirth latest 
>>>>>>>>>>> schema we
>>>>>>>>>>> will handle all schema changes. The only remaining obstacle is 
>>>>>>>>>>> Beam's SQL
>>>>>>>>>>> Java code. That's why I am looking for some solution. We dont need 
>>>>>>>>>>> multiple
>>>>>>>>>>> versions of SQL. We only need to regenerate SQL schema with the 
>>>>>>>>>>> latest
>>>>>>>>>>> schema on the fly.
>>>>>>>>>>>
>>>>>>>>>>> I hope I can explain it :)
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Can you explain the use case some more? Are you wanting to
>>>>>>>>>>>> change your SQL statement as well when the schema changes? If not, 
>>>>>>>>>>>> what are
>>>>>>>>>>>> those new fields doing in the pipeline? What I mean is that your 
>>>>>>>>>>>> old SQL
>>>>>>>>>>>> statement clearly didn't reference those fields in a SELECT 
>>>>>>>>>>>> statement since
>>>>>>>>>>>> they didn't exist, so what are you missing by not having them 
>>>>>>>>>>>> unless you
>>>>>>>>>>>> are also changing the SQL statement?
>>>>>>>>>>>>
>>>>>>>>>>>> Is this a case where you have a SELECT *, and just want to make
>>>>>>>>>>>> sure those fields are included?
>>>>>>>>>>>>
>>>>>>>>>>>> Reuven
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Andrew,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I assume SQL query is not going to change. Changing things is
>>>>>>>>>>>>> the Row schema by adding new columns or rename columns. if we 
>>>>>>>>>>>>> keep a
>>>>>>>>>>>>> version information on somewhere for example a KV pair. Key is 
>>>>>>>>>>>>> schema
>>>>>>>>>>>>> information, value is Row. Can not we generate SQL code ? Why I 
>>>>>>>>>>>>> am asking
>>>>>>>>>>>>> We have 15k pipelines. When we have a schema change we restart a 
>>>>>>>>>>>>> 15k DF job
>>>>>>>>>>>>> which is pain. I am looking for a possible way to avoid job 
>>>>>>>>>>>>> restart. Dont
>>>>>>>>>>>>> you think it is not still doable ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <
>>>>>>>>>>>>> apill...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java
>>>>>>>>>>>>>> code on the fly, even if we did, that wouldn't solve your 
>>>>>>>>>>>>>> problem. I
>>>>>>>>>>>>>> believe our recommended practice is to run both the old and new 
>>>>>>>>>>>>>> pipeline
>>>>>>>>>>>>>> for some time, then pick a window boundary to transition the 
>>>>>>>>>>>>>> output from
>>>>>>>>>>>>>> the old pipeline to the new one.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to 
>>>>>>>>>>>>>> serialize
>>>>>>>>>>>>>> data between steps of the pipeline. The builtin coders 
>>>>>>>>>>>>>> (including the
>>>>>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't 
>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>> schema evolution. They are optimized for performance at all 
>>>>>>>>>>>>>> costs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If you worked around this, the Beam model doesn't support
>>>>>>>>>>>>>> changing the structure of the pipeline graph. This would 
>>>>>>>>>>>>>> significantly
>>>>>>>>>>>>>> limit the changes you can make. It would also require some 
>>>>>>>>>>>>>> changes to SQL
>>>>>>>>>>>>>> to try to produce the same plan for an updated SQL query.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in
>>>>>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. 
>>>>>>>>>>>>>>> Over time the
>>>>>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code 
>>>>>>>>>>>>>>> based on what we
>>>>>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have 
>>>>>>>>>>>>>>> any idea
>>>>>>>>>>>>>>> How can we handle schema changes with resubmitting our beam 
>>>>>>>>>>>>>>> job. Is it
>>>>>>>>>>>>>>> possible to generate SQL java code on the fly ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>
> --
> Yours Sincerely
> Kobe Feng
>

Reply via email to