Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3511#discussion_r140663665
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.codegeneration;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
    +import org.apache.flink.runtime.operators.sort.InMemorySorter;
    +import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
    +
    +import freemarker.template.TemplateException;
    +
    +import org.codehaus.commons.compiler.CompileException;
    +import org.codehaus.janino.SimpleCompiler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.InvocationTargetException;
    +import java.util.HashMap;
    +import java.util.List;
    +
    +/**
    + * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
    + * for underlying data based on {@link TypeComparator}.
    + * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
    + * to a file, see http://janino-compiler.github.io/janino/#debugging
    + */
    +public class SorterFactory {
    +   // 
------------------------------------------------------------------------
    +   //                                   Constants
    +   // 
------------------------------------------------------------------------
    +   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
    +
    +   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
    +   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    +
    +   // 
------------------------------------------------------------------------
    +   //                                   Singleton Attribute
    +   // 
------------------------------------------------------------------------
    +   private static SorterFactory sorterFactory;
    +
    +   // 
------------------------------------------------------------------------
    +   //                                   Attributes
    +   // 
------------------------------------------------------------------------
    +   private SimpleCompiler classCompiler;
    +   private TemplateManager templateManager;
    +   private HashMap<String, Constructor> constructorCache;
    +
    +   /**
    +    * This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
    +    * to a non-generated sorter.
    +    */
    +   public boolean forceCodeGeneration = false;
    +
    +   /**
    +    * Constructor.
    +    */
    +   private SorterFactory() {
    +           this.templateManager = TemplateManager.getInstance();
    +           this.classCompiler = new SimpleCompiler();
    +           this.constructorCache = new HashMap<>();
    +   }
    +
    +   /**
    +    * A method to get a singleton instance
    +    * or create one if it hasn't been created yet.
    +    * @return
    +    */
    +   public static synchronized SorterFactory getInstance() {
    +           if (sorterFactory == null){
    +                   sorterFactory = new SorterFactory();
    +           }
    +
    +           return sorterFactory;
    +   }
    +
    +
    +   /**
    +    * Create a sorter for the given type comparator and
    +    * assign serializer, comparator and memory to the sorter.
    +    * @param serializer
    +    * @param comparator
    +    * @param memory
    +    * @return
    +    */
    +   public <T> InMemorySorter<T> createSorter(ExecutionConfig config, 
TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> 
memory) {
    +           if (config.isCodeGenerationForSorterEnabled()){
    +                   try {
    +                           return createCodegenSorter(serializer, 
comparator, memory);
    +                   } catch (IOException | TemplateException | 
ClassNotFoundException | IllegalAccessException |
    +                                   InstantiationException | 
NoSuchMethodException | InvocationTargetException | CompileException e) {
    +                           String msg = "Serializer: " + serializer +
    +                                           "[" + serializer + "], 
comparator: [" + comparator + "], exception: " + e.toString();
    +                           if (!forceCodeGeneration) {
    +                                   LOG.warn("An error occurred while 
trying to create a code generated sorter. Using non-codegen sorter instead. " + 
msg);
    +                                   return 
createNonCodegenSorter(serializer, comparator, memory);
    +                           } else {
    +                                   throw new RuntimeException("An error 
occurred while trying to create a code generated sorter. Failing the job, 
because forceCodeGeneration. " + msg);
    +                           }
    +                   }
    +           } else {
    +                   return createNonCodegenSorter(serializer, comparator, 
memory);
    +           }
    +   }
    +
    +   private <T> InMemorySorter<T> createCodegenSorter(TypeSerializer<T> 
serializer, TypeComparator<T> comparator, List<MemorySegment> memory)
    +                   throws IOException, TemplateException, 
ClassNotFoundException, IllegalAccessException, InstantiationException, 
NoSuchMethodException, InvocationTargetException, CompileException {
    +           SorterTemplateModel sorterModel = new 
SorterTemplateModel(comparator);
    +
    +           Constructor sorterConstructor;
    +
    +           synchronized (this){
    +                   if 
(constructorCache.getOrDefault(sorterModel.getSorterName(), null) != null) {
    +                           sorterConstructor = 
constructorCache.get(sorterModel.getSorterName());
    +                   } else {
    +                           String sorterName = sorterModel.getSorterName();
    +                           String generatedCode = 
this.templateManager.getGeneratedCode(sorterModel);
    +                           this.classCompiler.cook(generatedCode);
    +
    +                           sorterConstructor = 
this.classCompiler.getClassLoader().loadClass(sorterName).getConstructor(
    +                                           TypeSerializer.class, 
TypeComparator.class, List.class
    +                           );
    +
    +                           constructorCache.put(sorterName, 
sorterConstructor);
    +                   }
    +           }
    +
    +           @SuppressWarnings("unchecked")
    +           InMemorySorter<T> sorter = (InMemorySorter<T>) 
sorterConstructor.newInstance(serializer, comparator, memory);
    +
    +           if (LOG.isInfoEnabled()){
    +                   LOG.info("Using a custom sorter : " + 
sorter.toString());
    --- End diff --
    
    1c9830fa22e4140bbc19951a77c15dd0fa75ce37


---

Reply via email to