Awesome, I'll definitely try that out, thanks!

On Wed, Nov 20, 2019 at 9:36 PM Yuval Itzchakov <yuva...@gmail.com> wrote:

> Hi Li,
>
> You're in the right direction. One additional step would be to use
> RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and
> close functions which allow you to initialize and dispose resources
> properly.
>
> On Thu, 21 Nov 2019, 5:23 Li Peng, <li.p...@doordash.com> wrote:
>
>> Hey folks, I'm interested in streaming some data to Segment
>> <https://segment.com/docs/sources/server/java/>, using their existing
>> java library. This is a pretty high throughput stream, so I wanted for each
>> parallel operator to have its own instance of the segment client. From what
>> I could tell, defining a custom SinkFunction should be able to satisfy as
>> it as each parallel operator gets its own SinkFunction object
>> automatically. So my code looks like this:
>>
>> class SegmentSink() extends SinkFunction[Data] {
>>
>>   @transient
>>   val segmentClient: Analytics = Analytics.builder("key").build()
>>
>>   override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = 
>> {
>>     segmentClient.enqueue(...)
>>   }
>> }
>>
>> Can anyone verify if this is the right pattern for me to use? Is there
>> any risk of the SinkFunction getting repeatedly serialized/deserialized
>> which results in new segment clients getting created each time?
>>
>> Thanks,
>> Li
>>
>

Reply via email to