This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 7495aa7 [BEAM-7029] Add KafkaIO.Read as an external transform new 3aaf39a Merge pull request #8251: [BEAM-7029] Add KafkaIO.Read as an external transform 7495aa7 is described below commit 7495aa719037445a17a5af1d503766f5c5781d74 Author: Maximilian Michels <m...@apache.org> AuthorDate: Mon Apr 8 16:16:28 2019 +0200 [BEAM-7029] Add KafkaIO.Read as an external transform This adds KafkaIO.Read as an external transform and includes a Python wrapper (ReadFromkafka) for convenience. The transform only returns the key/value data for a Kafka topic. It does not include the meta data such as partition id, offset, or timestamp. By default, the data is returned as `KV<byte[], byte[]>`. Users can supply a Kafka Deserializer in ReadFromkafka such as LongDeserializer which will infer a different coder. Only a limited amount of Deserializers are supported. Alternatively, users can implement their own decoding in the target SDK. --- runners/flink/job-server/flink_job_server.gradle | 2 + sdks/java/io/kafka/build.gradle | 2 + .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 127 +++++++++++++- .../beam/sdk/io/kafka/KafkaIOExternalTest.java | 185 +++++++++++++++++++++ .../apache_beam/io/external/generate_sequence.py | 2 +- sdks/python/apache_beam/io/external/kafka.py | 152 +++++++++++++++++ .../runners/portability/flink_runner_test.py | 25 ++- 7 files changed, 492 insertions(+), 3 deletions(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index ebed8e4..84140fc 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -84,6 +84,8 @@ dependencies { compile project(path: ":beam-sdks-java-extensions-google-cloud-platform-core", configuration: "shadow") compile library.java.slf4j_simple // TODO: Enable AWS and HDFS file system. + // For resolving external transform requests + compile project(path: ":beam-sdks-java-io-kafka", configuration: "shadow") } // NOTE: runShadow must be used in order to run the job server. The standard run diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 9a88160..cfb1a62 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -33,6 +33,8 @@ dependencies { shadow "org.springframework:spring-expression:4.3.18.RELEASE" testCompile project(path: ":beam-runners-direct-java", configuration: "shadow") testCompile project(path: ":beam-sdks-java-core", configuration: "shadowTest") + // For testing Cross-language transforms + testCompile project(path: ":beam-runners-core-construction-java", configuration: "shadow") testCompile library.java.hamcrest_core testCompile library.java.hamcrest_library testCompile library.java.junit diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7feee9c..d542bcc 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -21,12 +21,15 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; +import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,17 +38,22 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.io.Read.Unbounded; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -57,6 +65,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; @@ -351,7 +360,8 @@ public class KafkaIO { abstract Builder<K, V> toBuilder(); @AutoValue.Builder - abstract static class Builder<K, V> { + abstract static class Builder<K, V> + implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> { abstract Builder<K, V> setConsumerConfig(Map<String, Object> config); abstract Builder<K, V> setTopics(List<String> topics); @@ -386,6 +396,121 @@ public class KafkaIO { abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig); abstract Read<K, V> build(); + + @Override + public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal( + External.Configuration config) { + ImmutableList.Builder<String> listBuilder = ImmutableList.builder(); + for (byte[] topic : config.topics) { + listBuilder.add(utf8String(topic)); + } + setTopics(listBuilder.build()); + + String keyDeserializerClassName = utf8String(config.keyDeserializer); + Class keyDeserializer = resolveClass(keyDeserializerClassName); + setKeyDeserializer(keyDeserializer); + setKeyCoder(resolveCoder(keyDeserializer)); + + String valueDeserializerClassName = utf8String(config.valueDeserializer); + Class valueDeserializer = resolveClass(valueDeserializerClassName); + setValueDeserializer(valueDeserializer); + setValueCoder(resolveCoder(valueDeserializer)); + + Map<String, Object> consumerConfig = new HashMap<>(); + for (KV<byte[], byte[]> kv : config.consumerConfig) { + String key = utf8String(kv.getKey()); + String value = utf8String(kv.getValue()); + consumerConfig.put(key, value); + } + // Key and Value Deserializers always have to be in the config. + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName()); + consumerConfig.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName()); + setConsumerConfig(consumerConfig); + + // Set required defaults + setTopicPartitions(Collections.emptyList()); + setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN); + setMaxNumRecords(Long.MAX_VALUE); + setCommitOffsetsInFinalizeEnabled(false); + setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()); + // We do not include Metadata until we can encode KafkaRecords cross-language + return build().withoutMetadata(); + } + + private static Coder resolveCoder(Class deserializer) { + for (Method method : deserializer.getDeclaredMethods()) { + if (method.getName().equals("deserialize")) { + Class<?> returnType = method.getReturnType(); + if (returnType.equals(Object.class)) { + continue; + } + if (returnType.equals(byte[].class)) { + return ByteArrayCoder.of(); + } else if (returnType.equals(Integer.class)) { + return VarIntCoder.of(); + } else if (returnType.equals(Long.class)) { + return VarLongCoder.of(); + } else { + throw new RuntimeException("Couldn't infer Coder from " + deserializer); + } + } + } + throw new RuntimeException("Couldn't resolve coder for Deserializer: " + deserializer); + } + + private static Class resolveClass(String className) { + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not find deserializer class: " + className); + } + } + + private static String utf8String(byte[] bytes) { + return new String(bytes, Charsets.UTF_8); + } + } + + /** + * Exposes {@link KafkaIO.TypedWithoutMetadata} as an external transform for cross-language + * usage. + */ + @AutoService(ExternalTransformRegistrar.class) + public static class External implements ExternalTransformRegistrar { + + public static final String URN = "beam:external:java:kafka:read:v1"; + + @Override + public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() { + return ImmutableMap.of(URN, AutoValue_KafkaIO_Read.Builder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class Configuration { + + // All byte arrays are UTF-8 encoded strings + private Iterable<KV<byte[], byte[]>> consumerConfig; + private Iterable<byte[]> topics; + private byte[] keyDeserializer; + private byte[] valueDeserializer; + + public void setConsumerConfig(Iterable<KV<byte[], byte[]>> consumerConfig) { + this.consumerConfig = consumerConfig; + } + + public void setTopics(Iterable<byte[]> topics) { + this.topics = topics; + } + + public void setKeyDeserializer(byte[] keyDeserializer) { + this.keyDeserializer = keyDeserializer; + } + + public void setValueDeserializer(byte[] valueDeserializer) { + this.valueDeserializer = valueDeserializer; + } + } } /** Sets the bootstrap servers for the Kafka consumer. */ diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java new file mode 100644 index 0000000..ecf68ea --- /dev/null +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.expansion.v1.ExpansionApi; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.ReadTranslation; +import org.apache.beam.runners.core.construction.expansion.ExpansionService; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.hamcrest.Matchers; +import org.junit.Test; + +/** Tests for building {@link KafkaIO} externally via the ExpansionService. */ +public class KafkaIOExternalTest { + @Test + public void testConstructKafkaIO() throws Exception { + List<String> topics = ImmutableList.of("topic1", "topic2"); + String keyDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"; + String valueDeserializer = "org.apache.kafka.common.serialization.LongDeserializer"; + ImmutableMap<String, String> consumerConfig = + ImmutableMap.<String, String>builder() + .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "server1:port,server2:port") + .put("key2", "value2") + .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer) + .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) + .build(); + + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .putConfiguration( + "topics", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:iterable:v1") + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(listAsBytes(topics))) + .build()) + .putConfiguration( + "consumer_config", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:iterable:v1") + .addCoderUrn("beam:coder:kv:v1") + .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(mapAsBytes(consumerConfig))) + .build()) + .putConfiguration( + "key_deserializer", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(encodeString(keyDeserializer))) + .build()) + .putConfiguration( + "value_deserializer", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(encodeString(valueDeserializer))) + .build()) + .build(); + + RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); + ExpansionApi.ExpansionRequest request = + ExpansionApi.ExpansionRequest.newBuilder() + .setComponents(defaultInstance) + .setTransform( + RunnerApi.PTransform.newBuilder() + .setUniqueName("test") + .setSpec( + RunnerApi.FunctionSpec.newBuilder() + .setUrn("beam:external:java:kafka:read:v1") + .setPayload(payload.toByteString()))) + .setNamespace("test_namespace") + .build(); + + ExpansionService expansionService = new ExpansionService(); + TestStreamObserver<ExpansionApi.ExpansionResponse> observer = new TestStreamObserver<>(); + expansionService.expand(request, observer); + + ExpansionApi.ExpansionResponse result = observer.result; + RunnerApi.PTransform transform = result.getTransform(); + assertThat( + transform.getSubtransformsList(), + Matchers.contains( + "test_namespacetest/KafkaIO.Read", "test_namespacetest/Remove Kafka Metadata")); + assertThat(transform.getInputsCount(), Matchers.is(0)); + assertThat(transform.getOutputsCount(), Matchers.is(1)); + + RunnerApi.PTransform kafkaComposite = + result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0)); + RunnerApi.PTransform kafkaRead = + result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(0)); + RunnerApi.ReadPayload readPayload = + RunnerApi.ReadPayload.parseFrom(kafkaRead.getSpec().getPayload()); + KafkaUnboundedSource source = + (KafkaUnboundedSource) ReadTranslation.unboundedSourceFromProto(readPayload); + KafkaIO.Read spec = source.getSpec(); + + assertThat(spec.getConsumerConfig(), Matchers.is(consumerConfig)); + assertThat(spec.getTopics(), Matchers.is(topics)); + assertThat(spec.getKeyDeserializer().getName(), Matchers.is(keyDeserializer)); + assertThat(spec.getValueDeserializer().getName(), Matchers.is(valueDeserializer)); + } + + private static byte[] listAsBytes(List<String> stringList) throws IOException { + IterableCoder<byte[]> coder = IterableCoder.of(ByteArrayCoder.of()); + List<byte[]> bytesList = + stringList.stream().map(KafkaIOExternalTest::rawBytes).collect(Collectors.toList()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + coder.encode(bytesList, baos); + return baos.toByteArray(); + } + + private static byte[] mapAsBytes(Map<String, String> stringMap) throws IOException { + IterableCoder<KV<byte[], byte[]>> coder = + IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of())); + List<KV<byte[], byte[]>> bytesList = + stringMap.entrySet().stream() + .map(kv -> KV.of(rawBytes(kv.getKey()), rawBytes(kv.getValue()))) + .collect(Collectors.toList()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + coder.encode(bytesList, baos); + return baos.toByteArray(); + } + + private static byte[] encodeString(String str) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteArrayCoder.of().encode(rawBytes(str), baos); + return baos.toByteArray(); + } + + private static byte[] rawBytes(String str) { + Preconditions.checkNotNull(str, "String must not be null."); + return str.getBytes(Charsets.UTF_8); + } + + private static class TestStreamObserver<T> implements StreamObserver<T> { + + private T result; + + @Override + public void onNext(T t) { + result = t; + } + + @Override + public void onError(Throwable throwable) { + throw new RuntimeException("Should not happen", throwable); + } + + @Override + public void onCompleted() {} + } +} diff --git a/sdks/python/apache_beam/io/external/generate_sequence.py b/sdks/python/apache_beam/io/external/generate_sequence.py index 4ad6b46..4c9d0b8 100644 --- a/sdks/python/apache_beam/io/external/generate_sequence.py +++ b/sdks/python/apache_beam/io/external/generate_sequence.py @@ -32,7 +32,7 @@ class GenerateSequence(ptransform.PTransform): def __init__(self, start, stop=None, elements_per_period=None, max_read_time=None, - expansion_service=None): + expansion_service='localhost:8097'): super(GenerateSequence, self).__init__() self._urn = 'beam:external:java:generate_sequence:v1' self.start = start diff --git a/sdks/python/apache_beam/io/external/kafka.py b/sdks/python/apache_beam/io/external/kafka.py new file mode 100644 index 0000000..b5b4943 --- /dev/null +++ b/sdks/python/apache_beam/io/external/kafka.py @@ -0,0 +1,152 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" + PTransforms for supporting Kafka in Python pipelines. These transforms do not + run a Kafka client in Python. Instead, they expand to ExternalTransform and + utilize the Java SDK's Kafka IO. The expansion service will insert Kafka Java + transforms before the pipeline is executed. Users currently have to provide + the address of the Java expansion service. Flink Users can use the built-in + expansion service of the Flink Runner's job server. +""" + +from __future__ import absolute_import + +from apache_beam import ExternalTransform +from apache_beam import pvalue +from apache_beam.coders import BytesCoder +from apache_beam.coders import IterableCoder +from apache_beam.coders import TupleCoder +from apache_beam.coders.coders import LengthPrefixCoder +from apache_beam.portability.api.external_transforms_pb2 import ConfigValue +from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload +from apache_beam.transforms import ptransform + + +class ReadFromKafka(ptransform.PTransform): + """ + An external PTransform which reads from Kafka and returns a KV pair for + each item in the specified Kafka topics. If no Kafka Deserializer for + key/value is provided, then the data will be returned as a raw byte array. + + Note: To use this transform, you need to start the Java expansion service. + Please refer to the portability documentation on how to do that. The + expansion service address has to be provided when instantiating this + transform. During pipeline translation this transform will be replaced by + the Java SDK's KafkaIO. + + If you start Flink's job server, the expansion service will be started on + port 8097. This is also the configured default for this transform. For a + different address, please set the expansion_service parameter. + + For more information see: + - https://beam.apache.org/documentation/runners/flink/ + - https://beam.apache.org/roadmap/portability/ + + Note: Runners need to support translating Read operations in order to use + this source. At the moment only the Flink Runner supports this. + """ + + # Returns the key/value data as raw byte arrays + byte_array_deserializer = 'org.apache.kafka.common.serialization.' \ + 'ByteArrayDeserializer' + + def __init__(self, consumer_config, + topics, + key_deserializer=byte_array_deserializer, + value_deserializer=byte_array_deserializer, + expansion_service='localhost:8097'): + """ + Initializes a read operation from Kafka. + + :param consumer_config: A dictionary containing the consumer configuration. + :param topics: A list of topic strings. + :param key_deserializer: A fully-qualified Java class name of a Kafka + Deserializer for the topic's key, e.g. + 'org.apache.kafka.common. + serialization.LongDeserializer'. + Default: 'org.apache.kafka.common. + serialization.ByteArrayDeserializer'. + :param value_deserializer: A fully-qualified Java class name of a Kafka + Deserializer for the topic's value, e.g. + 'org.apache.kafka.common. + serialization.LongDeserializer'. + Default: 'org.apache.kafka.common. + serialization.ByteArrayDeserializer'. + :param expansion_service: The address (host:port) of the ExpansionService. + """ + super(ReadFromKafka, self).__init__() + self._urn = 'beam:external:java:kafka:read:v1' + self.consumer_config = consumer_config + self.topics = topics + self.key_deserializer = key_deserializer + self.value_deserializer = value_deserializer + self.expansion_service = expansion_service + + def expand(self, pbegin): + if not isinstance(pbegin, pvalue.PBegin): + raise Exception("ReadFromKafka must be a root transform") + + args = { + 'consumer_config': + ReadFromKafka._encode_map(self.consumer_config), + 'topics': + ReadFromKafka._encode_list(self.topics), + 'key_deserializer': + ReadFromKafka._encode_str(self.key_deserializer), + 'value_deserializer': + ReadFromKafka._encode_str(self.value_deserializer), + } + + payload = ExternalConfigurationPayload(configuration=args) + return pbegin.apply( + ExternalTransform( + self._urn, + payload.SerializeToString(), + self.expansion_service)) + + @staticmethod + def _encode_map(dict_obj): + kv_list = [(key.encode('utf-8'), val.encode('utf-8')) + for key, val in dict_obj.items()] + coder = IterableCoder(TupleCoder( + [LengthPrefixCoder(BytesCoder()), LengthPrefixCoder(BytesCoder())])) + coder_urns = ['beam:coder:iterable:v1', + 'beam:coder:kv:v1', + 'beam:coder:bytes:v1', + 'beam:coder:bytes:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(kv_list)) + + @staticmethod + def _encode_list(list_obj): + encoded_list = [val.encode('utf-8') for val in list_obj] + coder = IterableCoder(LengthPrefixCoder(BytesCoder())) + coder_urns = ['beam:coder:iterable:v1', + 'beam:coder:bytes:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(encoded_list)) + + @staticmethod + def _encode_str(str_obj): + encoded_str = str_obj.encode('utf-8') + coder = LengthPrefixCoder(BytesCoder()) + coder_urns = ['beam:coder:bytes:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(encoded_str)) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index d67b5fb..13dd7bd 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -29,6 +29,7 @@ from tempfile import mkdtemp import apache_beam as beam from apache_beam.io.external.generate_sequence import GenerateSequence +from apache_beam.io.external.kafka import ReadFromKafka from apache_beam.metrics import Metrics from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PortableOptions @@ -157,7 +158,7 @@ if __name__ == '__main__': def test_no_subtransform_composite(self): raise unittest.SkipTest("BEAM-4781") - def test_external_transform(self): + def test_external_transforms(self): options = self.create_options() options._all_options['parallelism'] = 1 options._all_options['streaming'] = True @@ -172,6 +173,28 @@ if __name__ == '__main__': assert_that(res, equal_to([i for i in range(1, 10)])) + # We expect to fail here because we do not have a Kafka cluster handy. + # Nevertheless, we check that the transform is expanded by the + # ExpansionService and that the pipeline fails during execution. + with self.assertRaises(Exception) as ctx: + with self.create_pipeline() as p: + # pylint: disable=expression-not-assigned + (p + | ReadFromKafka(consumer_config={'bootstrap.servers': + 'notvalid1:7777, notvalid2:3531'}, + topics=['topic1', 'topic2'], + key_deserializer='org.apache.kafka.' + 'common.serialization.' + 'ByteArrayDeserializer', + value_deserializer='org.apache.kafka.' + 'common.serialization.' + 'LongDeserializer', + expansion_service=expansion_address)) + self.assertTrue('No resolvable bootstrap urls given in bootstrap.servers' + in str(ctx.exception), + 'Expected to fail due to invalid bootstrap.servers, but ' + 'failed due to:\n%s' % str(ctx.exception)) + def test_flattened_side_input(self): # Blocked on support for transcoding # https://jira.apache.org/jira/browse/BEAM-6523