On Wed, Aug 17, 2022 at 12:59 PM Yi Hu via dev <[email protected]> wrote:
> Hi Brian, > > Currently Java's JDBCIO does not do special cross-language things. A > DATETIME field type appears because the Row contains an Instant object. And > the sdk will always encode the Instant object using InstantCoder. This is > done for both Java pipelines and cross-language pipelines. To use > millis_instant in JDBCIO and to avoid break Instant used elsewhere (like > watermarks) I will need to change the type of timestamp returned by JDBC > read from Instant to millis_instant. > I think this is fine (even though it would add a small perf hit to JdbcIO.Read). We also probably should make this conversion a utility method that can be used elsewhere when we need to encode datetime fields. We should also document that "beam:logical_type:datetime:v1" is not portable (till we fix the incompatibility). Thanks, Cham > > We could make the name of the logical type to be > "beam:logical_type:millis_instant" which is backed by a big endian int 64. > > Best, > Yi > > > On Fri, Aug 12, 2022 at 5:28 PM Brian Hulette <[email protected]> wrote: > >> Ah sorry, I forgot that INT64 is encoded with VarIntCoder, so we can't >> simulate TimestampCoder with a logical type. >> >> I think the ideal end state would be to have a well-defined >> beam:logical_type:millis_instant that we use for cross-language (when >> appropriate), and never use DATETIME at cross-language boundaries. Would it >> be possible to add millis_instant, and use that for JDBC read/write instead >> of DATETIME? >> >> Separately we could consider how to resolve the conflicting definitions >> of beam:logical_type:datetime:v1. I'm not quite sure how/if we can do that >> without breaking pipeline update. >> >> Brian >> >> >> On Fri, Aug 12, 2022 at 7:50 AM Yi Hu via dev <[email protected]> >> wrote: >> >>> Hi Cham, >>> >>> Thanks for the comments. >>> >>> >>>> >>>>> >>>>> ii. "beam:logical_type:instant:v1" is still backed by INT64, but in >>>>> implementation it will use BigEndianLongCoder to encode/decode the stream. >>>>> >>>>> >>>> Is this to be compatible with the current Java implementation ? And we >>>> have to update other SDKs to use big endian coders when encoding/decoding >>>> the "beam:logical_type:instant:v1" logical type ? >>>> >>>> >>> Yes, and the proposal is aimed to keep the Java SDK change minimal; we >>> have to update other SDKs to make it work. Currently python and go sdk does >>> not implement "beam:logical_type:datetime:v1" (will >>> be "beam:logical_type:instant:v1") at all. >>> >>> >>>> >>>> >>>>> For the second step ii, the problem is that there is a primitive type >>>>> backed by a fixed length integer coder. Currently INT8, INT16, INT32, >>>>> INT64... are all backed by VarInt (and there is ongoing work to use fixed >>>>> size big endian to encode INT8, INT16 ( >>>>> https://github.com/apache/beam/issues/19815)). Ideally I would think >>>>> (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT) >>>>> primitive type is backed by VarInt. But this may be a more substantial >>>>> change for the current code base. >>>>> >>>> >>>> I'm a bit confused by this. Did you mean that there's *no* primitive >>>> type backed by a fixed length integer coder ? Also, by primitive, I'm >>>> assuming you mean Beam Schema types here. >>>> >>>> >>> Yes I mean Beam Schema types here. The proto for datetime(instant) >>> logical type is constructed here: >>> https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202 >>> It is represented by an INT64 atomic type. In cross-language case, >>> another SDK receives proto and decodes the stream according to the proto. >>> Currently I do not see an atomic type that will be decoded using a >>> fixed-length BigEndianLong coder. INT8, ..., INT64 will all be decoded with >>> VarInt. >>> >>> As a workaround in the PR (#22561), in python's RowCoder I explicitly >>> set the coder for "beam:logical_type:datetime:v1" (will >>> be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a >>> way to keep the logic contained in the logical type implementation, e.g. in >>> to_language_type and to_representation_type method. To do this I will need >>> an atomic type that is decoded using the BigEndianLong coder. >>> Please point out if I was wrong. >>> >>> Best, >>> Yi >>> >>
