Thanks Nico -

Thanks for the feedback, and nice catch on the missing volatile. 

Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835

> On Nov 3, 2017, at 7:48 AM, Nico Kruber <n...@data-artisans.com> wrote:
> 
> Hi Ron,
> imho your code should be fine (except for a potential visibility problem on 
> the 
> changes of the non-volatile partitionMap member, depending on your needs).
> 
> The #open() method should be called (once) for each sink initialization 
> (according to the javadoc) and then you should be fine with the asynchronous 
> updater thread.
> I'm including Gordon (cc'd) just to be sure as he may know more.
> 
> 
> Nico
> 
> On Friday, 3 November 2017 04:06:02 CET Ron Crocker wrote:
>> We have a system where the Kafka partition a message should go into is a
>> function of a value in the message. Often, it’s value % # partitions, but
>> for some values it’s not - it’s a specified list of partitions that changes
>> over time. Our “simple Java library” that produces messages for this system
>> also has a background thread that periodically polls a HTTP endpoint (at a
>> rate of 1/minute as its default) to refresh that list of special cases.
>> 
>> It’s easy to create a FlinkKafkaPartitioner that does the mod operation;
>> what I’m not so sure about is how to get this polling operation into the
>> partitioner. I’m about to try it the obvious way (create a background
>> thread that polls the URL and updates the partition map), but I wonder if
>> that’s actually going to cause a bunch of problems for the Flink runtime.
>> 
>> Here’s the code that I have right now:
>> public class EventInsertPartitioner extends KafkaPartitioner<Tuple2<Long,
>> String>> { private final String partitionerURL;
>>    private final long updateIntervalInMillis;
>>    private Map<Long, List<Integer>> partitionMap;
>>    private ScheduledExecutorService executor;
>> 
>>    public EventInsertPartitioner(String partitionerURL, long
>> updateIntervalInMillis) { this.partitionerURL = partitionerURL;
>>        this.updateIntervalInMillis = updateIntervalInMillis;
>>        this.partitionMap = new HashMap<>();
>>    }
>> 
>>    @Override
>>    public void open(int parallelInstanceId, int parallelInstances, int[]
>> partitions) { executor = Executors.newScheduledThreadPool(1);
>>        executor.scheduleAtFixedRate(
>>                () -> updatePartitionMapRunnable(),
>>                updateIntervalInMillis,
>>                updateIntervalInMillis,
>>                TimeUnit.MILLISECONDS);
>> 
>>    }
>> 
>>    private void updatePartitionMapRunnable() {
>>        // Make synchronous request to partitionerURL
>>        // This is a simple JSON that matches our data
>>        String response = "{1:[1,2,3],2:[2]}";
>>        // Replace current partitionMap with new HashMap from the response
>>        this.partitionMap = convertResponseToMap(response);
>>        // Replacing the current value of partitionMap with the updated
>> version doesn't // require synchronization
>>    }
>> 
>>    private Map<Long, List<Integer>> convertResponseToMap(String response) {
>> Map<Long, List<Integer>> hashMap = new HashMap<>();
>>        // Convert response to JSON structure and just use that?
>>        // or Iterate and add to local hashMap
>>        return hashMap;
>>    }
>> 
>>    @Override
>>    public int partition(Tuple2<Long, String> next, byte[] serializedKey,
>> byte[] serializedValue, int numPartitions) { long myKey = next.f0;
>> 
>>        if (partitionMap.containsKey(myKey)) {
>>            List<Integer> partitions = partitionMap.get(myKey);
>>            myKey =
>> partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); }
>> 
>>        return (int)(myKey % numPartitions);
>>    }
>> }
>> Ron
>> —
>> Ron Crocker
>> Principal Engineer & Architect
>> ( ( •)) New Relic
>> rcroc...@newrelic.com
>> M: +1 630 363 8835

Reply via email to