One approach could be creating PTransform with expand method that wraps
AvroIO and reads AVRO writer schema from one of files matching read pattern.

It will work if the set of sources with different schemas is fixed at
pipeline construction step.

```
public abstract class GenericAvroIORead extends PTransform<PBegin,
PCollection<GenericRecord>> {
    public abstract String from();

    public static Schema getSchema(String from) {
      ResourceId resourceId = FileSystems. ... (from);
      InputStream inputStream = ... (resourceId);
      return new DataFileStream(inputStream, new
GenericDatumReader()).getSchema();
    }

    public PCollection<GenericRecord> expand(PBegin input) {
      Schema schema = getSchema(filepattern());

      return input.apply(AvroIO.readGenericRecords(schema).from(from()));
    }

}
```

On Sun, Jan 13, 2019 at 11:41 PM Reuven Lax <[email protected]> wrote:

> It would probably be horribly inefficient, but might work. Not only would
> you need to embed the schema in every single record, I think you would need
> to create a new GenericDatumReader on each record. You might have trouble
> getting this to scale.
>
> BTW you might need to create a new encoding format for this. The problem
> is that GenericDatumReader (which we use to read Avro records) requires the
> schema ahead of time, and in your model we won't have the schema until a
> GenericRecord is produced (also I don't believe the encoded Avro record
> will include the schema).; this produces a chicken/egg problem. You would
> probably need to create a new encoding format that allowed you to store the
> schema along with the record. At that point this isn't AvroCoder, it's a
> coder for the new type you've created.
>
> Reuven
>
> On Sun, Jan 13, 2019 at 1:28 PM Niel Markwick <[email protected]> wrote:
>
>> This is my point though : AvroIO.parseAllGenericRecords() is able to
>> decode the object from the avro file into a GenericRecord  _without_
>> knowing the schema in advance as it uses the writer schema embedded in the
>> file.
>>
>> So can there be a GenericRecordAvroCoder which uses the schema embedded
>> in the GenericRecord to encode itself?
>>
>> On Sun, 13 Jan 2019, 16:59 Reuven Lax, <[email protected]> wrote:
>>
>>> AvroCoder needs to know the schema of the object in order to decode the
>>> object. Remember that in Beam the set of PCollections in a graph is static,
>>> so all the coders need to be known up front. To make things work with
>>> parseAllGenericRecords I think you would either need to embed the schema in
>>> every single record (which would be expensive), or you would need to create
>>> a new union type to represent the possible types (assuming that you know
>>> the possible schemas ahead of time).
>>>
>>> On Sat, Jan 12, 2019 at 12:09 PM Niel Markwick <[email protected]> wrote:
>>>
>>>> Considering the transform is reading Avro container files, which by
>>>> definition
>>>> <https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files>
>>>> contain a schema, it should be possible for the reader to infer the schema
>>>> from the file...
>>>>
>>>> parseAllGenericRecords()
>>>> <https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/io/AvroIO.html#parseAllGenericRecords-org.apache.beam.sdk.transforms.SerializableFunction->
>>>>  seems
>>>> to be able to do this, decodes and passes a GenericRecord to the
>>>> given parseFn without needing to know the schema in advance...
>>>>
>>>> In fact parseAllGenericRecords() would be perfect for my requirements
>>>> if I could use a Contextful.Fn as a parseFn that accepted side imputs :/
>>>>
>>>>
>>>>
>>>> <https://cloud.google.com>
>>>> * •  **Niel Markwick*
>>>> * •  *Cloud Solutions Architect
>>>> * •  *Google Belgium
>>>> * •  *[email protected]
>>>> * •  *+32 2 894 6771
>>>>
>>>> Google Belgium NV/SA, Steenweg op Etterbeek 180
>>>> <https://maps.google.com/?q=Steenweg%C2%A0op%C2%A0Etterbeek%C2%A0180&entry=gmail&source=g>
>>>> , 1040 Brussel, Belgie. RPR: 0878.065.378
>>>>
>>>> If you received this communication by mistake, please don't forward it
>>>> to anyone else (it may contain confidential or privileged information),
>>>> please erase all copies of it, including all attachments, and please let
>>>> the sender know it went to the wrong person. Thanks
>>>>
>>>>
>>>> On Sat, 12 Jan 2019 at 20:08, Alex Van Boxel <[email protected]> wrote:
>>>>
>>>>> Hey Niels,
>>>>>
>>>>> The reason you need to specify the schema to GenericRecord is that
>>>>> without it it's *impossible* for GenericRecord to make any sense of
>>>>> the binary data. Unlike protobuf, avro doesn't have any kind of 
>>>>> information
>>>>> in the message about the structure. This makes it smaller, but impossible
>>>>> to decode without the schema.
>>>>>
>>>>> So if you really want todo flexible messages, I would read it binary,
>>>>> message per message and handle your schema switching into a DoFn.
>>>>>
>>>>>  _/
>>>>> _/ Alex Van Boxel
>>>>>
>>>>>
>>>>> On Sat, Jan 12, 2019 at 7:44 PM Niel Markwick <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Is there a reason why don't we have an AvroIO reader that reads and
>>>>>> outputs a GenericRecord without requiring any schema to be given?
>>>>>>
>>>>>> Does passing the schema into readGenericRecord() have any benefits
>>>>>> other than verifying that the avro file has records of the same schema?
>>>>>>
>>>>>> This could be useful for parsing a collection of avro files of
>>>>>> varying schemas, then post-processing the GenericRecords in further
>>>>>> transform with side inputs.
>>>>>>
>>>>>> --
>>>>>>
>>>>>> <https://cloud.google.com/>
>>>>>> * •  **Niel Markwick*
>>>>>> * •  *Cloud Solutions Architect
>>>>>> * •  *Google Belgium
>>>>>> * •  *[email protected]
>>>>>> * •  *+32 2 894 6771 <+3228946771>
>>>>>>
>>>>>> Google Belgium NV/SA, Steenweg op Etterbeek 180
>>>>>> <https://maps.google.com/?q=Steenweg%C2%A0op%C2%A0Etterbeek%C2%A0180&entry=gmail&source=g>
>>>>>> , 1040 Brussel, Belgie. RPR: 0878.065.378
>>>>>>
>>>>>> If you received this communication by mistake, please don't forward
>>>>>> it to anyone else (it may contain confidential or privileged 
>>>>>> information),
>>>>>> please erase all copies of it, including all attachments, and please let
>>>>>> the sender know it went to the wrong person. Thanks
>>>>>>
>>>>> --
>>
>> <https://cloud.google.com/>
>> * •  **Niel Markwick*
>> * •  *Cloud Solutions Architect
>> * •  *Google Belgium
>> * •  *[email protected]
>> * •  *+32 2 894 6771 <+3228946771>
>>
>>
>> Google Belgium NV/SA, Steenweg op Etterbeek 180, 1040 Brussel, Belgie. RPR: 
>> 0878.065.378
>>
>> If you received this communication by mistake, please don't forward it to
>> anyone else (it may contain confidential or privileged information), please
>> erase all copies of it, including all attachments, and please let the
>> sender know it went to the wrong person. Thanks
>>
>

-- 
Cheers,
Gleb

Reply via email to