[ 
https://issues.apache.org/jira/browse/BAHIR-168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496911#comment-16496911
 ] 

Vikram Agrawal commented on BAHIR-168:
--------------------------------------

[~shimamoto], [~luciano resende] - How can we contribute 
https://github.com/qubole/kinesis-sql in this project. There is open PR in this 
repo to support Kinesis Sink (https://github.com/qubole/kinesis-sql/pull/5). As 
of now both source and sink implementation requires Data source V1 APIs

> Kinesis support in Structured Streaming
> ---------------------------------------
>
>                 Key: BAHIR-168
>                 URL: https://issues.apache.org/jira/browse/BAHIR-168
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Spark Structured Streaming Connectors
>    Affects Versions: Spark-2.3
>            Reporter: Takako Shimamoto
>            Priority: Major
>
> Implement Kinesis based sources and sinks for Structured Streaming
> h2. Kinesis Sources
> I hope that [this|https://github.com/qubole/kinesis-sql] will be contributed 
> to Apache Bahir, as commented in the SPARK-18165.
> h2. Kinesis Sinks
> I've implemented a Sink here: 
> [https://github.com/shimamoto/bahir/tree/kinesis-writer/sql-kinesis]
>  This requires the Spark 2.3 and datasource v2 API. I plan on opening a PR, 
> but Bahir hasn't supported Spark 2.3 yet. We need to handle BAHIR-167 first.
> A brief overview is listed below.
> h3. Description
> Add a new Kinesis Sink and Kinesis Relation for writing streaming and batch 
> queries, respectively, to AWS Kinesis.
> The Dataframe being written to Kinesis should have the following columns in 
> schema:
> ||Column||Type||
> |partitionKey (optional)|string|
> |data (required)|string or binary|
> If the partition key column is missing, then a SHA-256 digest of data as a 
> hex string will be automatically added.
> h4. Streaming Kinesis Sink
> {code}
> val df = inputStream.toDS().toDF("partitionKey", "data")
> val writer = df.writeStream
>   .format("kinesis")
>   .option("streamName", "test-stream")
>   .option("region", "us-east-1")
>   .option("checkpointLocation", checkpointDir.getCanonicalPath)
>   .start()
> {code}
> h4. Batch Kinesis Sink
> {code}
> val df = Seq("partitionKey-1" -> "data1", "partitionKey-2" -> "data2")
>   .toDF("partitionKey", "data")
> df.write
>   .format("kinesis")
>   .option("streamName", "test-stream")
>   .option("region", "us-east-1")
>   .save()
> {code}
> h3. Configuration
> The following options must be set for both batch and streaming queries.
> ||Option||value||default||meaning||
> |streamName|string|-|The stream name associated with the Sink.|
> |region|string|-|The region name for Kinesis Stream.|
> The following configurations are optional.
> ||Option||value||default||meaning||
> |chunksize|int|50|Rate limit on maximum number of records processed per 
> PutRecords request.|
> |endpoint|string|(none)|Only use this if using a non-standard service 
> endpoint.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to