youngoli commented on a change in pull request #14996: URL: https://github.com/apache/beam/pull/14996#discussion_r651468815
########## File path: sdks/go/pkg/beam/io/kafkaio/kafka.go ########## @@ -0,0 +1,330 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package kafkaio contains cross-language functionality for using Apache Kafka +// (http://kafka.apache.org/). These transforms only work on runners that +// support cross-language transforms. +// +// Setup +// +// Transforms specified here are cross-language transforms implemented in a +// different SDK (listed below). During pipeline construction, the Go SDK will +// need to connect to an expansion service containing information on these +// transforms in their native SDK. +// +// To use an expansion service, it must be run as a separate process accessible +// during pipeline construction. The address of that process must be passed to +// the transforms in this package. +// +// The version of the expansion service should match the version of the Beam SDK +// being used. For numbered releases of Beam, these expansions services are +// released to the Maven repository as modules. For development versions of +// Beam, it is recommended to build and run it from source using Gradle. +// +// Current supported SDKs, including expansion service modules and reference +// documentation: +// * Java +// - Vendored Module: beam-sdks-java-io-expansion-service +// - Run via Gradle: ./gradlew :sdks:java:io:expansion-service:runExpansionService +// - Reference Class: org.apache.beam.sdk.io.kafka.KafkaIO +package kafkaio + +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" + "reflect" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*readPayload)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*writePayload)(nil)).Elem()) +} + +type policy string + +const ( + ByteArrayDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer" + ByteArraySerializer = "org.apache.kafka.common.serialization.ByteArraySerializer" + + // ProcessingTime is a timestamp policy that assigns processing time to + // each record. Specifically, this is the timestamp when the record becomes + // "current" in the reader. Further documentation can be found in Java's + // KafkaIO documentation. + ProcessingTime policy = "ProcessingTime" + + // CreateTime is a timestamp policy based on the CREATE_TIME timestamps of + // kafka records. Requires the records to have a type set to + // org.apache.kafka.common.record.TimestampTypeCREATE_TIME. Further + // documentation can be found in Java's KafkaIO documentation. + CreateTime policy = "CreateTime" + + // LogAppendTime is a timestamp policy that assigns Kafka's log append time + // (server side ingestion time) to each record. Further documentation can + // be found in Java's KafkaIO documentation. + LogAppendTime policy = "LogAppendTime" + + readURN = "beam:external:java:kafka:read:v1" + writeURN = "beam:external:java:kafka:write:v1" +) + +// Read is a cross-language PTransform which reads from Kafka and returns a +// KV pair for each item in the specified Kafka topics. By default, this runs +// as an unbounded transform and outputs keys and values as raw byte arrays. +// These properties can be changed through optional parameters. +// +// Read requires the address for an expansion service for Kafka Read transforms, +// a comma-seperated list of bootstrap server addresses (see the Kafka property +// "bootstrap.servers" for details), and at least one topic to read from. +// +// Read also accepts optional parameters as readOptions. All optional parameters +// are predefined in this package as functions that return readOption. To set +// an optional parameter, call the function within Read's function signature. +// +// Example of Read with required and optional parameters: +// +// expansionAddr := "localhost:1234" +// bootstrapServer := "bootstrap-server:1234" +// topic := "topic_name" +// pcol := kafkaio.Read( s, expansionAddr, bootstrapServer, []string{topic}, +// kafkaio.MaxNumRecords(100), kafkaio.CommitOffsetInFinalize(true)) +func Read(s beam.Scope, addr string, servers string, topics []string, opts ...readOption) beam.PCollection { + s = s.Scope("kafkaio.Read") + + if len(topics) == 0 { + panic("kafkaio.Read requires at least one topic to read from.") + } + + rpl := readPayload{ + ConsumerConfig: map[string]string{"bootstrap.servers": servers}, + Topics: topics, + KeyDeserializer: ByteArrayDeserializer, + ValueDeserializer: ByteArrayDeserializer, + TimestampPolicy: string(ProcessingTime), + } + rcfg := readConfig{ + pl: &rpl, + key: reflectx.ByteSlice, + val: reflectx.ByteSlice, + } + for _, opt := range opts { + opt(&rcfg) + } + + pl := beam.CrossLanguagePayload(rpl) + outT := beam.UnnamedOutput(typex.NewKV(typex.New(rcfg.key), typex.New(rcfg.val))) + out := beam.CrossLanguage(s, readURN, pl, addr, nil, outT) + return out[graph.UnnamedOutputTag] +} + +type readOption func(*readConfig) +type readConfig struct { + pl *readPayload + key reflect.Type + val reflect.Type +} + +// ConsumerConfigs is a Read option that adds consumer properties to the +// Consumer configuration of the transform. Each usage of this adds the given +// elements to the existing map without removing existing elements. +// +// Note that the "bootstrap.servers" property is automatically set by +// kafkaio.Read and does not need to be specified via this option. +func ConsumerConfigs(cfgs map[string]string) readOption { + return func(cfg *readConfig) { + for k, v := range cfgs { + cfg.pl.ConsumerConfig[k] = v + } + } +} + +// KeyDeserializer is a Read option that specifies a fully-qualified Java class +// name of a Kafka Deserializer for the topic's key, along with the +// corresponding Go type to deserialize keys to. +// +// Defaults to []byte, with classname +// "org.apache.kafka.common.serialization.ByteArrayDeserializer". +func KeyDeserializer(classname string, keyType reflect.Type) readOption { + return func(cfg *readConfig) { + cfg.pl.KeyDeserializer = classname + cfg.key = keyType + } +} + +// ValueDeserializer is a Read option that specifies a fully-qualified Java +// class name of a Kafka Deserializer for the topic's value, along with the +// corresponding Go type to deserialize values to. +// +// Defaults to []byte, with classname +// "org.apache.kafka.common.serialization.ByteArrayDeserializer". +func ValueDeserializer(classname string, valType reflect.Type) readOption { + return func(cfg *readConfig) { + cfg.pl.ValueDeserializer = classname + cfg.val = valType + } +} Review comment: Yeah, I was brainstorming some of this too. I went with punting this to later and just leaving it as only []byte. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
