Awesome, I'll definitely try that out, thanks! On Wed, Nov 20, 2019 at 9:36 PM Yuval Itzchakov <yuva...@gmail.com> wrote:
> Hi Li, > > You're in the right direction. One additional step would be to use > RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and > close functions which allow you to initialize and dispose resources > properly. > > On Thu, 21 Nov 2019, 5:23 Li Peng, <li.p...@doordash.com> wrote: > >> Hey folks, I'm interested in streaming some data to Segment >> <https://segment.com/docs/sources/server/java/>, using their existing >> java library. This is a pretty high throughput stream, so I wanted for each >> parallel operator to have its own instance of the segment client. From what >> I could tell, defining a custom SinkFunction should be able to satisfy as >> it as each parallel operator gets its own SinkFunction object >> automatically. So my code looks like this: >> >> class SegmentSink() extends SinkFunction[Data] { >> >> @transient >> val segmentClient: Analytics = Analytics.builder("key").build() >> >> override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = >> { >> segmentClient.enqueue(...) >> } >> } >> >> Can anyone verify if this is the right pattern for me to use? Is there >> any risk of the SinkFunction getting repeatedly serialized/deserialized >> which results in new segment clients getting created each time? >> >> Thanks, >> Li >> >