[
https://issues.apache.org/jira/browse/BAHIR-168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494688#comment-16494688
]
Luciano Resende commented on BAHIR-168:
---------------------------------------
Thanks, we are in the process of migrating to datasource v2, and any help there
or providing the new contribution is welcome.
> Kinesis support in Structured Streaming
> ---------------------------------------
>
> Key: BAHIR-168
> URL: https://issues.apache.org/jira/browse/BAHIR-168
> Project: Bahir
> Issue Type: New Feature
> Components: Spark Structured Streaming Connectors
> Affects Versions: Spark-2.3
> Reporter: Takako Shimamoto
> Priority: Major
>
> Implement Kinesis based sources and sinks for Structured Streaming
> h2. Kinesis Sources
> I hope that [this|https://github.com/qubole/kinesis-sql] will be contributed
> to Apache Bahir, as commented in the SPARK-18165.
> h2. Kinesis Sinks
> I've implemented a Sink here:
> [https://github.com/shimamoto/bahir/tree/kinesis-writer/sql-kinesis]
> This requires the Spark 2.3 and datasource v2 API. I plan on opening a PR,
> but Bahir hasn't supported Spark 2.3 yet. We need to handle BAHIR-167 first.
> A brief overview is listed below.
> h3. Description
> Add a new Kinesis Sink and Kinesis Relation for writing streaming and batch
> queries, respectively, to AWS Kinesis.
> The Dataframe being written to Kinesis should have the following columns in
> schema:
> ||Column||Type||
> |partitionKey (optional)|string|
> |data (required)|string or binary|
> If the partition key column is missing, then a SHA-256 digest of data as a
> hex string will be automatically added.
> h4. Streaming Kinesis Sink
> {code}
> val df = inputStream.toDS().toDF("partitionKey", "data")
> val writer = df.writeStream
> .format("kinesis")
> .option("streamName", "test-stream")
> .option("region", "us-east-1")
> .option("checkpointLocation", checkpointDir.getCanonicalPath)
> .start()
> {code}
> h4. Batch Kinesis Sink
> {code}
> val df = Seq("partitionKey-1" -> "data1", "partitionKey-2" -> "data2")
> .toDF("partitionKey", "data")
> df.write
> .format("kinesis")
> .option("streamName", "test-stream")
> .option("region", "us-east-1")
> .save()
> {code}
> h3. Configuration
> The following options must be set for both batch and streaming queries.
> ||Option||value||default||meaning||
> |streamName|string|-|The stream name associated with the Sink.|
> |region|string|-|The region name for Kinesis Stream.|
> The following configurations are optional.
> ||Option||value||default||meaning||
> |chunksize|int|50|Rate limit on maximum number of records processed per
> PutRecords request.|
> |endpoint|string|(none)|Only use this if using a non-standard service
> endpoint.|
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)