Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3511#discussion_r139926078 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.codegeneration; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; +import org.apache.flink.runtime.operators.sort.InMemorySorter; +import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; + +import freemarker.template.TemplateException; + +import org.codehaus.commons.compiler.CompileException; +import org.codehaus.janino.SimpleCompiler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.List; + +/** + * {@link SorterFactory} is a singleton class that provides functionalities to create the most suitable sorter + * for underlying data based on {@link TypeComparator}. + * Note: the generated code can be inspected by configuring Janino to write the code that is being compiled + * to a file, see http://janino-compiler.github.io/janino/#debugging + */ +public class SorterFactory { + // ------------------------------------------------------------------------ + // Constants + // ------------------------------------------------------------------------ + private static final Logger LOG = LoggerFactory.getLogger(SorterFactory.class); + + /** Fixed length records with a length below this threshold will be in-place sorted, if possible. */ + private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; + + // ------------------------------------------------------------------------ + // Singleton Attribute + // ------------------------------------------------------------------------ + private static SorterFactory sorterFactory; + + // ------------------------------------------------------------------------ + // Attributes + // ------------------------------------------------------------------------ + private SimpleCompiler classCompiler; + private TemplateManager templateManager; + private HashMap<String, Constructor> constructorCache; + + /** + * This is only for testing. If an error occurs, we want to fail the test, instead of falling back + * to a non-generated sorter. + */ + public boolean forceCodeGeneration = false; + + /** + * Constructor. + */ + private SorterFactory() { + this.templateManager = TemplateManager.getInstance(); + this.classCompiler = new SimpleCompiler(); + this.constructorCache = new HashMap<>(); + } + + /** + * A method to get a singleton instance + * or create one if it hasn't been created yet. + * @return + */ + public static synchronized SorterFactory getInstance() { + if (sorterFactory == null){ + sorterFactory = new SorterFactory(); + } + + return sorterFactory; + } + + + /** + * Create a sorter for the given type comparator and + * assign serializer, comparator and memory to the sorter. + * @param serializer + * @param comparator + * @param memory + * @return + */ + public <T> InMemorySorter<T> createSorter(ExecutionConfig config, TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory) { + if (config.isCodeGenerationForSorterEnabled()){ + try { + return createCodegenSorter(serializer, comparator, memory); + } catch (IOException | TemplateException | ClassNotFoundException | IllegalAccessException | + InstantiationException | NoSuchMethodException | InvocationTargetException | CompileException e) { + String msg = "Serializer: " + serializer + + "[" + serializer + "], comparator: [" + comparator + "], exception: " + e.toString(); + if (!forceCodeGeneration) { + LOG.warn("An error occurred while trying to create a code generated sorter. Using non-codegen sorter instead. " + msg); + return createNonCodegenSorter(serializer, comparator, memory); + } else { + throw new RuntimeException("An error occurred while trying to create a code generated sorter. Failing the job, because forceCodeGeneration. " + msg); + } + } + } else { + return createNonCodegenSorter(serializer, comparator, memory); + } + } + + private <T> InMemorySorter<T> createCodegenSorter(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory) + throws IOException, TemplateException, ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, CompileException { + SorterTemplateModel sorterModel = new SorterTemplateModel(comparator); + + Constructor sorterConstructor; + + synchronized (this){ + if (constructorCache.getOrDefault(sorterModel.getSorterName(), null) != null) { --- End diff -- `constructorCache.containsKey(sorterModel.getSorterName())`?
---