On Wed, Aug 17, 2022 at 12:59 PM Yi Hu via dev <[email protected]> wrote:

> Hi Brian,
>
> Currently Java's JDBCIO does not do special cross-language things. A
> DATETIME field type appears because the Row contains an Instant object. And
> the sdk will always encode the Instant object using InstantCoder. This is
> done for both Java pipelines and cross-language pipelines. To use
> millis_instant in JDBCIO and to avoid break Instant used elsewhere (like
> watermarks) I will need to change the type of timestamp returned by JDBC
> read from Instant to millis_instant.
>

I think this is fine (even though it would add a small perf hit to
JdbcIO.Read). We also probably should make this conversion a utility method
that can be used elsewhere when we need to encode datetime fields.
We should also document that "beam:logical_type:datetime:v1" is not
portable (till we fix the incompatibility).

Thanks,
Cham


>
> We could make the name of the logical type to be
> "beam:logical_type:millis_instant" which is backed by a big endian int 64.
>

> Best,
> Yi
>
>
> On Fri, Aug 12, 2022 at 5:28 PM Brian Hulette <[email protected]> wrote:
>
>> Ah sorry, I forgot that INT64 is encoded with VarIntCoder, so we can't
>> simulate TimestampCoder with a logical type.
>>
>> I think the ideal end state would be to have a well-defined
>> beam:logical_type:millis_instant that we use for cross-language (when
>> appropriate), and never use DATETIME at cross-language boundaries. Would it
>> be possible to add millis_instant, and use that for JDBC read/write instead
>> of DATETIME?
>>
>> Separately we could consider how to resolve the conflicting definitions
>> of beam:logical_type:datetime:v1. I'm not quite sure how/if we can do that
>> without breaking pipeline update.
>>
>> Brian
>>
>>
>> On Fri, Aug 12, 2022 at 7:50 AM Yi Hu via dev <[email protected]>
>> wrote:
>>
>>> Hi Cham,
>>>
>>> Thanks for the comments.
>>>
>>>
>>>>
>>>>>
>>>>> ii. "beam:logical_type:instant:v1" is still backed by INT64, but in
>>>>> implementation it will use BigEndianLongCoder to encode/decode the stream.
>>>>>
>>>>>
>>>> Is this to be compatible with the current Java implementation ? And we
>>>> have to update other SDKs to use big endian coders when encoding/decoding
>>>> the "beam:logical_type:instant:v1" logical type ?
>>>>
>>>>
>>> Yes, and the proposal is aimed to keep the Java SDK change minimal; we
>>> have to update other SDKs to make it work. Currently python and go sdk does
>>> not implement "beam:logical_type:datetime:v1" (will
>>> be "beam:logical_type:instant:v1") at all.
>>>
>>>
>>>>
>>>>
>>>>> For the second step ii, the problem is that there is a primitive type
>>>>> backed by a fixed length integer coder. Currently INT8, INT16, INT32,
>>>>> INT64... are all backed by VarInt (and there is ongoing work to use fixed
>>>>> size big endian to encode INT8, INT16 (
>>>>> https://github.com/apache/beam/issues/19815)). Ideally I would think
>>>>> (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT)
>>>>> primitive type is backed by VarInt. But this may be a more substantial
>>>>> change for the current code base.
>>>>>
>>>>
>>>> I'm a bit confused by this. Did you mean that there's *no* primitive
>>>> type backed by a fixed length integer coder ? Also, by primitive, I'm
>>>> assuming you mean Beam Schema types here.
>>>>
>>>>
>>> Yes I mean Beam Schema types here. The proto for datetime(instant)
>>> logical type is constructed here:
>>> https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202
>>> It is represented by an INT64 atomic type. In cross-language case,
>>> another SDK receives proto and decodes the stream according to the proto.
>>> Currently I do not see an atomic type that will be decoded using a
>>> fixed-length BigEndianLong coder. INT8, ..., INT64 will all be decoded with
>>> VarInt.
>>>
>>> As a workaround in the PR (#22561), in python's RowCoder I explicitly
>>> set the coder for "beam:logical_type:datetime:v1" (will
>>> be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a
>>> way to keep the logic contained in the logical type implementation, e.g. in
>>> to_language_type and to_representation_type method. To do this I will need
>>> an atomic type that is decoded using the BigEndianLong coder.
>>> Please point out if I was wrong.
>>>
>>> Best,
>>> Yi
>>>
>>

Reply via email to