Re: Streaming data to Segment
Awesome, I'll definitely try that out, thanks! On Wed, Nov 20, 2019 at 9:36 PM Yuval Itzchakov wrote: > Hi Li, > > You're in the right direction. One additional step would be to use > RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and > close functions which allow you to initialize and dispose resources > properly. > > On Thu, 21 Nov 2019, 5:23 Li Peng, wrote: > >> Hey folks, I'm interested in streaming some data to Segment >> <https://segment.com/docs/sources/server/java/>, using their existing >> java library. This is a pretty high throughput stream, so I wanted for each >> parallel operator to have its own instance of the segment client. From what >> I could tell, defining a custom SinkFunction should be able to satisfy as >> it as each parallel operator gets its own SinkFunction object >> automatically. So my code looks like this: >> >> class SegmentSink() extends SinkFunction[Data] { >> >> @transient >> val segmentClient: Analytics = Analytics.builder("key").build() >> >> override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = >> { >> segmentClient.enqueue(...) >> } >> } >> >> Can anyone verify if this is the right pattern for me to use? Is there >> any risk of the SinkFunction getting repeatedly serialized/deserialized >> which results in new segment clients getting created each time? >> >> Thanks, >> Li >> >
Re: Streaming data to Segment
Hi Li, You're in the right direction. One additional step would be to use RickSinkFunction[Data] instead of SinkFunction[Data] which exposes open and close functions which allow you to initialize and dispose resources properly. On Thu, 21 Nov 2019, 5:23 Li Peng, wrote: > Hey folks, I'm interested in streaming some data to Segment > <https://segment.com/docs/sources/server/java/>, using their existing > java library. This is a pretty high throughput stream, so I wanted for each > parallel operator to have its own instance of the segment client. From what > I could tell, defining a custom SinkFunction should be able to satisfy as > it as each parallel operator gets its own SinkFunction object > automatically. So my code looks like this: > > class SegmentSink() extends SinkFunction[Data] { > > @transient > val segmentClient: Analytics = Analytics.builder("key").build() > > override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = { > segmentClient.enqueue(...) > } > } > > Can anyone verify if this is the right pattern for me to use? Is there any > risk of the SinkFunction getting repeatedly serialized/deserialized which > results in new segment clients getting created each time? > > Thanks, > Li >
Streaming data to Segment
Hey folks, I'm interested in streaming some data to Segment <https://segment.com/docs/sources/server/java/>, using their existing java library. This is a pretty high throughput stream, so I wanted for each parallel operator to have its own instance of the segment client. From what I could tell, defining a custom SinkFunction should be able to satisfy as it as each parallel operator gets its own SinkFunction object automatically. So my code looks like this: class SegmentSink() extends SinkFunction[Data] { @transient val segmentClient: Analytics = Analytics.builder("key").build() override def invoke(value: Data, context: SinkFunction.Context[_]): Unit = { segmentClient.enqueue(...) } } Can anyone verify if this is the right pattern for me to use? Is there any risk of the SinkFunction getting repeatedly serialized/deserialized which results in new segment clients getting created each time? Thanks, Li