I guess the simplest way would be to use a constructor parameter:
> public static class CampaignProcessor implements
> ProcessorSupplier<Windowed<String>, List<String[]>>
> {
> private final String jedis_server;
>
> public CampaignProcessor(String jedisServer) {
> this.jedis_server = jedisServer;
> }
>
> @Override
> public Processor<Windowed<String>, List<String[]>> get() {
> return new Processor<Windowed<String>, List<String[]>>() {
> CampaignProcessorCommon campaignProcessorCommon;
>
> @Override
> public void init(ProcessorContext context) {
> // // looks like Flink code over here ;)
> // ParameterTool parameterTool = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> // parameterTool.getRequired("jedis_server");
> // LOG.info("Opening connection with Jedis to {}",
> parameterTool.getRequired("jedis_server"));
>
> // this.campaignProcessorCommon = new
> CampaignProcessorCommon(parameterTool.getRequired("jedis_server"));
> //this.campaignProcessorCommon = new
> CampaignProcessorCommon("localhost");
> this.campaignProcessorCommon = new
> CampaignProcessorCommon(jedis_server);
> this.campaignProcessorCommon.prepare();
> }
>
> @Override
> public void process(Windowed<String> key, List<String[]>
> tupleList) {
> tupleList.forEach(x -> {
> String[] tuple = x;
> String campaign_id = tuple[0];
> String event_time = tuple[2];
> this.campaignProcessorCommon.execute(campaign_id,
> event_time);
> });
> }
>
> @Override
> public void punctuate(long timestamp) {}
>
> @Override
> public void close() {}
> };
> }
> }On 04/16/2016 09:35 PM, rss rss wrote: > Hello, > > I'm implementing yahoo streaming benchmark with Kafka streaming > processing. How to pass a string into Processor or ValueTransformer > instance? Accordingly to yahoo benchmark logic I need to transfer an > address of Redis server. > https://github.com/rssdev10/streaming-benchmarks/blob/master/kafka-benchmarks/src/main/java/kafka/benchmark/AdvertisingTopology.java#L212 > > PS: the code above is runnable but not checked on real clusters. > > Best regards >
signature.asc
Description: OpenPGP digital signature
