This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 04ae01c Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka. new 8b759d1 Merge pull request #13779 from [BEAM-11677] Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka 04ae01c is described below commit 04ae01ca97e0258d831d80bc5216ec160605a981 Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Tue Jan 19 15:49:34 2021 -0800 Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka. --- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 29 ++++++++++++++++-- .../beam/sdk/io/kafka/KafkaIOExternalTest.java | 6 +++- sdks/python/apache_beam/io/kafka.py | 34 +++++++++++++++------- .../runners/portability/flink_runner_test.py | 5 +++- 4 files changed, 60 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 60608e0..d77d4fa 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -586,8 +586,23 @@ public class KafkaIO { setMaxReadTime(Duration.standardSeconds(config.maxReadTime)); } setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE : config.maxNumRecords); - setCommitOffsetsInFinalizeEnabled(false); - setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()); + + // Set committing offset configuration. + setCommitOffsetsInFinalizeEnabled(config.commitOffsetInFinalize); + + // Set timestamp policy with built-in types. + String timestampPolicy = config.timestampPolicy; + if (timestampPolicy.equals("ProcessingTime")) { + setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()); + } else if (timestampPolicy.equals("CreateTime")) { + setTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(Duration.ZERO)); + } else if (timestampPolicy.equals("LogAppendTime")) { + setTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime()); + } else { + throw new IllegalArgumentException( + "timestampPolicy should be one of (ProcessingTime, CreateTime, LogAppendTime)"); + } + if (config.startReadTime != null) { setStartReadTime(Instant.ofEpochMilli(config.startReadTime)); } @@ -645,6 +660,8 @@ public class KafkaIO { private Long startReadTime; private Long maxNumRecords; private Long maxReadTime; + private Boolean commitOffsetInFinalize; + private String timestampPolicy; public void setConsumerConfig(Map<String, String> consumerConfig) { this.consumerConfig = consumerConfig; @@ -673,6 +690,14 @@ public class KafkaIO { public void setMaxReadTime(Long maxReadTime) { this.maxReadTime = maxReadTime; } + + public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) { + this.commitOffsetInFinalize = commitOffsetInFinalize; + } + + public void setTimestampPolicy(String timestampPolicy) { + this.timestampPolicy = timestampPolicy; + } } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index 82c67eb..39b2981 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -89,12 +89,16 @@ public class KafkaIOExternalTest { "consumer_config", FieldType.map(FieldType.STRING, FieldType.STRING)), Field.of("key_deserializer", FieldType.STRING), Field.of("value_deserializer", FieldType.STRING), - Field.of("start_read_time", FieldType.INT64))) + Field.of("start_read_time", FieldType.INT64), + Field.of("commit_offset_in_finalize", FieldType.BOOLEAN), + Field.of("timestamp_policy", FieldType.STRING))) .withFieldValue("topics", topics) .withFieldValue("consumer_config", consumerConfig) .withFieldValue("key_deserializer", keyDeserializer) .withFieldValue("value_deserializer", valueDeserializer) .withFieldValue("start_read_time", startReadTime) + .withFieldValue("commit_offset_in_finalize", false) + .withFieldValue("timestamp_policy", "ProcessingTime") .build()); RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py index b36c791..9a97737 100644 --- a/sdks/python/apache_beam/io/kafka.py +++ b/sdks/python/apache_beam/io/kafka.py @@ -92,15 +92,12 @@ from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder ReadFromKafkaSchema = typing.NamedTuple( 'ReadFromKafkaSchema', - [ - ('consumer_config', typing.Mapping[unicode, unicode]), - ('topics', typing.List[unicode]), - ('key_deserializer', unicode), - ('value_deserializer', unicode), - ('start_read_time', typing.Optional[int]), - ('max_num_records', typing.Optional[int]), - ('max_read_time', typing.Optional[int]), - ]) + [('consumer_config', typing.Mapping[unicode, unicode]), + ('topics', typing.List[unicode]), ('key_deserializer', unicode), + ('value_deserializer', unicode), ('start_read_time', typing.Optional[int]), + ('max_num_records', typing.Optional[int]), + ('max_read_time', typing.Optional[int]), + ('commit_offset_in_finalize', bool), ('timestamp_policy', str)]) def default_io_expansion_service(): @@ -120,6 +117,10 @@ class ReadFromKafka(ExternalTransform): byte_array_deserializer = ( 'org.apache.kafka.common.serialization.ByteArrayDeserializer') + processing_time_policy = 'ProcessingTime' + create_time_policy = 'CreateTime' + log_append_time = 'LogAppendTime' + URN = 'beam:external:java:kafka:read:v1' def __init__( @@ -131,6 +132,8 @@ class ReadFromKafka(ExternalTransform): start_read_time=None, max_num_records=None, max_read_time=None, + commit_offset_in_finalize=False, + timestamp_policy=processing_time_policy, expansion_service=None, ): """ @@ -152,8 +155,18 @@ class ReadFromKafka(ExternalTransform): for tests and demo applications. :param max_read_time: Maximum amount of time in seconds the transform executes. Mainly used for tests and demo applications. + :param commit_offset_in_finalize: Whether to commit offsets when finalizing. + :param timestamp_policy: The built-in timestamp policy which is used for + extracting timestamp from KafkaRecord. :param expansion_service: The address (host:port) of the ExpansionService. """ + if timestamp_policy not in [ReadFromKafka.processing_time_policy, + ReadFromKafka.create_time_policy, + ReadFromKafka.log_append_time]: + raise ValueError( + 'timestamp_policy should be one of ' + '[ProcessingTime, CreateTime, LogAppendTime]') + super(ReadFromKafka, self).__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -165,7 +178,8 @@ class ReadFromKafka(ExternalTransform): max_num_records=max_num_records, max_read_time=max_read_time, start_read_time=start_read_time, - )), + commit_offset_in_finalize=commit_offset_in_finalize, + timestamp_policy=timestamp_policy)), expansion_service or default_io_expansion_service()) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index a5f6ac3..2b4c8a6 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -238,7 +238,8 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): p | ReadFromKafka( consumer_config={ - 'bootstrap.servers': 'notvalid1:7777, notvalid2:3531' + 'bootstrap.servers': 'notvalid1:7777, notvalid2:3531', + 'group.id': 'any_group' }, topics=['topic1', 'topic2'], key_deserializer='org.apache.kafka.' @@ -247,6 +248,8 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): value_deserializer='org.apache.kafka.' 'common.serialization.' 'LongDeserializer', + commit_offset_in_finalize=True, + timestamp_policy=ReadFromKafka.create_time_policy, expansion_service=self.get_expansion_service())) self.assertTrue( 'No resolvable bootstrap urls given in bootstrap.servers' in str(