This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7ff0a262f2ae4c57ab5e7f5e213ab17317f70a69 Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Tue Jan 15 13:24:09 2019 +0100 Serialize windowedValue to byte[] in source to be able to specify a binary dataset schema and deserialize windowedValue from Row to get a dataset<WindowedValue> --- .../translation/batch/DatasetSourceBatch.java | 29 ++++++++++++++++------ .../batch/ReadSourceTranslatorBatch.java | 9 ++++++- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java index d9e1722..c4cfeaf 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import static com.google.common.base.Preconditions.checkArgument; import static scala.collection.JavaConversions.asScalaBuffer; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -30,6 +31,7 @@ import org.apache.beam.runners.core.serialization.Base64Serializer; import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.ContinuousReadSupport; @@ -93,10 +95,11 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { @Override public StructType readSchema() { + // TODO: find a way to extend schema with a WindowedValue schema StructField[] array = new StructField[1]; - StructField dummyStructField = StructField - .apply("dummyStructField", DataTypes.NullType, true, Metadata.empty()); - array[0] = dummyStructField; + StructField binaryStructField = StructField + .apply("binaryStructField", DataTypes.BinaryType, true, Metadata.empty()); + array[0] = binaryStructField; return new StructType(array); } @@ -135,11 +138,13 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { private static class DatasetPartitionReader<T> implements InputPartitionReader<InternalRow> { private boolean started; private boolean closed; + private BoundedSource<T> source; private BoundedReader<T> reader; DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) { this.started = false; this.closed = false; + this.source = source; // reader is not serializable so lazy initialize it try { reader = source @@ -162,10 +167,20 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { @Override public InternalRow get() { List<Object> list = new ArrayList<>(); - list.add( - WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp())); - return InternalRow.apply(asScalaBuffer(list).toList()); + WindowedValue<T> windowedValue = WindowedValue + .timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp()); + //serialize the windowedValue to bytes array to comply with dataset binary schema + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder + .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try { + windowedValueCoder.encode(windowedValue, byteArrayOutputStream); + byte[] bytes = byteArrayOutputStream.toByteArray(); + list.add(bytes); + } catch (IOException e) { + throw new RuntimeException(e); + } +return InternalRow.apply(asScalaBuffer(list).toList()); } @Override diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 8810e21..fec0fd3 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; +import java.io.ByteArrayInputStream; import java.io.IOException; import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils; import org.apache.beam.runners.core.construction.ReadTranslation; @@ -26,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -64,10 +66,15 @@ class ReadSourceTranslatorBatch<T> .option(DatasetSourceBatch.PIPELINE_OPTIONS, PipelineOptionsSerializationUtils.serializeToJson(context.getOptions())).load(); + // extract windowedValue from Row MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() { @Override public WindowedValue call(Row value) throws Exception { //there is only one value put in each Row by the InputPartitionReader - return value.<WindowedValue>getAs(0); + byte[] bytes = (byte[]) value.get(0); + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder + .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); + WindowedValue<T> windowedValue = windowedValueCoder.decode(new ByteArrayInputStream(bytes)); + return windowedValue; } }; //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedValue<T>>