[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-10-01 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r142032957
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 ---
@@ -146,6 +151,7 @@ public static void teardown() throws Exception {
public static Collection executionModes() {
return Arrays.asList(
new Object[] { TestExecutionMode.CLUSTER },
+   new Object[] { 
TestExecutionMode.CLUSTER_WITH_CODEGENERATION_ENABLED },
--- End diff --

The difference in testing time between not having 
`TestExecutionMode.CLUSTER_WITH_CODEGENERATION_ENABLED` and having it is about 
1 minute on my laptop. About half of this are the tests based 
on`DriverBaseITCase` and `CopyableValueDriverBaseITCase`.

(Here is a patch for removing `TestExecutionMode.CLUSTER_SORTER_CODEGEN`: 
https://github.com/ggevay/flink/commit/be1f1c3041fdf6c4dd487b2643f2d399ee24abe9)


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-10-01 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r142032016
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 ---
@@ -146,6 +151,7 @@ public static void teardown() throws Exception {
public static Collection executionModes() {
return Arrays.asList(
new Object[] { TestExecutionMode.CLUSTER },
+   new Object[] { 
TestExecutionMode.CLUSTER_WITH_CODEGENERATION_ENABLED },
--- End diff --

157cd4c19f6725fb89b0fcd30facd27878d64e20


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140664772
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.TemplateException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private TemplateManager templateManager;
+   private HashMap constructorCache;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.templateManager = TemplateManager.getInstance();
+   this.classCompiler = new SimpleCompiler();
+   this.constructorCache = new HashMap<>();
+   }
+
+   /**
+* A method to get a singleton instance
+* or create one if it hasn't been created yet.
+* @return
+*/
+   public static synchronized SorterFactory getInstance() {
+   if (sorterFactory == null){
+   sorterFactory = new SorterFactory();
+   }
+
+   return sorterFactory;
+   }
+
+
+   /**
+* Create a sorter for the given type comparator and
+* assign serializer, comparator and memory to the sorter.
+* @param serializer
+* @param comparator
+* @param memory
+* @return
+*/
+   public  InMemorySorter createSorter(ExecutionConfig config, 
TypeSerializer serializer, 

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread heytitle
Github user heytitle commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140664033
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.Configuration;
+import freemarker.template.Template;
+import freemarker.template.TemplateException;
+import freemarker.template.TemplateExceptionHandler;
+import freemarker.template.Version;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private HashMap constructorCache;
+   private final Template template;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.classCompiler = new SimpleCompiler();
+   
this.classCompiler.setParentClassLoader(this.getClass().getClassLoader());
--- End diff --

Ok, I got it now. I thought there is some things behind this parent class 
loader field and influence the existence of `classComplier`.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140663665
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.TemplateException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private TemplateManager templateManager;
+   private HashMap constructorCache;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.templateManager = TemplateManager.getInstance();
+   this.classCompiler = new SimpleCompiler();
+   this.constructorCache = new HashMap<>();
+   }
+
+   /**
+* A method to get a singleton instance
+* or create one if it hasn't been created yet.
+* @return
+*/
+   public static synchronized SorterFactory getInstance() {
+   if (sorterFactory == null){
+   sorterFactory = new SorterFactory();
+   }
+
+   return sorterFactory;
+   }
+
+
+   /**
+* Create a sorter for the given type comparator and
+* assign serializer, comparator and memory to the sorter.
+* @param serializer
+* @param comparator
+* @param memory
+* @return
+*/
+   public  InMemorySorter createSorter(ExecutionConfig config, 
TypeSerializer serializer, 

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140663639
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterTemplateModel.java
 ---
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link SorterTemplateModel} is a class that implements code generation 
logic for a given
+ * {@link TypeComparator}.
+ *
+ * The swap and compare methods in {@link NormalizedKeySorter} work on 
a sequence of bytes.
+ * We speed up these operations by splitting this sequence of bytes into 
chunks that can
+ * be handled by primitive operations such as Integer and Long 
operations.
+ */
+class SorterTemplateModel {
+
+   // 

+   //   Constants
+   // 

+
+   static final String TEMPLATE_NAME = "sorter.ftlh";
+
+   /** We don't split to chunks above this size. */
+   private static final int SPLITTING_THRESHOLD = 32;
+
+   /**
+* POSSIBLE_CHUNK_SIZES must be in descending order,
+* because methods that using it are using greedy approach.
+*/
+   private static final Integer[] POSSIBLE_CHUNK_SIZES = {8, 4, 2, 1};
+
+   /** Mapping from chunk sizes to primitive operators. */
+   private static final HashMap byteOperatorMapping = new 
HashMap(){
+   {
+   put(8, "Long");
+   put(4, "Int");
+   put(2, "Short");
+   put(1, "Byte");
+   }
+   };
+
+   // 

+   //   Attributes
+   // 

+
+   private final TypeComparator typeComparator;
+
+   /**
+* Sizes of the chunks. Empty, if we are not splitting to chunks. (See 
calculateChunks())
+*/
+   private final ArrayList primitiveChunks;
+
+   private final String sorterName;
+
+   /**
+* Shows whether the order of records can be completely determined by 
the normalized
+* sorting key, or the sorter has to also deserialize records if their 
keys are equal to
+* really confirm the order.
+*/
+   private final boolean normalizedKeyFullyDetermines;
+
+   /**
+* Constructor.
+* @param typeComparator
+*The type information of underlying data
+*/
+   SorterTemplateModel(TypeComparator typeComparator){
+   this.typeComparator = typeComparator;
+
+   // number of bytes of the sorting key
+   int numKeyBytes;
+
+   // compute no. bytes for sorting records and check whether 
these bytes are just a prefix or not.
+   if (this.typeComparator.supportsNormalizedKey()) {
+   // compute the max normalized key length
+   int numPartialKeys;
+   try {
+   numPartialKeys = 
this.typeComparator.getFlatComparators().length;
+   } catch (Throwable t) {
+   numPartialKeys = 1;
+   }
+
+   int maxLen = 
Math.min(NormalizedKeySorter.DEFAULT_MAX_NORMALIZED_KEY_LEN, 
NormalizedKeySorter.MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys);
+
+   numKeyBytes = 

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140663635
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterTemplateModel.java
 ---
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link SorterTemplateModel} is a class that implements code generation 
logic for a given
+ * {@link TypeComparator}.
+ *
+ * The swap and compare methods in {@link NormalizedKeySorter} work on 
a sequence of bytes.
+ * We speed up these operations by splitting this sequence of bytes into 
chunks that can
+ * be handled by primitive operations such as Integer and Long 
operations.
+ */
+class SorterTemplateModel {
+
+   // 

+   //   Constants
+   // 

+
+   static final String TEMPLATE_NAME = "sorter.ftlh";
+
+   /** We don't split to chunks above this size. */
+   private static final int SPLITTING_THRESHOLD = 32;
+
+   /**
+* POSSIBLE_CHUNK_SIZES must be in descending order,
+* because methods that using it are using greedy approach.
+*/
+   private static final Integer[] POSSIBLE_CHUNK_SIZES = {8, 4, 2, 1};
+
+   /** Mapping from chunk sizes to primitive operators. */
+   private static final HashMap byteOperatorMapping = new 
HashMap(){
+   {
+   put(8, "Long");
+   put(4, "Int");
+   put(2, "Short");
+   put(1, "Byte");
+   }
+   };
+
+   // 

+   //   Attributes
+   // 

+
+   private final TypeComparator typeComparator;
+
+   /**
+* Sizes of the chunks. Empty, if we are not splitting to chunks. (See 
calculateChunks())
+*/
+   private final ArrayList primitiveChunks;
+
+   private final String sorterName;
+
+   /**
+* Shows whether the order of records can be completely determined by 
the normalized
+* sorting key, or the sorter has to also deserialize records if their 
keys are equal to
+* really confirm the order.
+*/
+   private final boolean normalizedKeyFullyDetermines;
+
+   /**
+* Constructor.
+* @param typeComparator
+*The type information of underlying data
+*/
+   SorterTemplateModel(TypeComparator typeComparator){
+   this.typeComparator = typeComparator;
+
+   // number of bytes of the sorting key
+   int numKeyBytes;
+
+   // compute no. bytes for sorting records and check whether 
these bytes are just a prefix or not.
+   if (this.typeComparator.supportsNormalizedKey()) {
+   // compute the max normalized key length
+   int numPartialKeys;
+   try {
+   numPartialKeys = 
this.typeComparator.getFlatComparators().length;
+   } catch (Throwable t) {
+   numPartialKeys = 1;
+   }
+
+   int maxLen = 
Math.min(NormalizedKeySorter.DEFAULT_MAX_NORMALIZED_KEY_LEN, 
NormalizedKeySorter.MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys);
+
+   numKeyBytes = 

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140663591
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.TemplateException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private TemplateManager templateManager;
+   private HashMap constructorCache;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.templateManager = TemplateManager.getInstance();
+   this.classCompiler = new SimpleCompiler();
--- End diff --

909b59e3174a5348c2ee550806d4711475f5c268
cce40c5cd4aaff446bb4bec8918d2fda37649e0a

This was a bit tricky, I hope I haven't messed it up. I will think about 
this a bit more in the next few days, and maybe do some more testing, to see 
that we are not keeping alive anything from past jobs. Unfortunately, I don't 
know how this could be tested in an automated way. I think I will test it 
manually by just submitting hundreds of jobs, and watching in a profiler that 
object counts are not growing.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140663593
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 ---
@@ -309,7 +309,7 @@ public boolean write(T record) throws IOException {
//   Access Utilities
// 


-   private long readPointer(int logicalPosition) {
+   protected long readPointer(int logicalPosition) {
--- End diff --

Thanks, I've made them `protected final`:
db14ac58d137ed39f63bc4f8d724622f4cae518f
5e31cf011d5b2d3a9e986414411e312310867c44


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140663604
  
--- Diff: flink-runtime/pom.xml ---
@@ -204,6 +204,18 @@ under the License.
reflections

 
+   
+   org.freemarker
--- End diff --

84cb9a83efdb384d91d4c967b237fc12dca68774


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140663599
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/TemplateManager.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import freemarker.template.Configuration;
+import freemarker.template.Template;
+import freemarker.template.TemplateException;
+import freemarker.template.TemplateExceptionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.Map;
+
+/**
+ * {@link TemplateManager} is a singleton class that provides template 
rendering functionalities for code generation.
+ * Such functionalities are caching, writing generated code to a file.
--- End diff --

Sorry, you are right, we don't write to a file anymore. Also, the caching 
is not handled by this class anymore (it is handled by `SorterFactory`). So 
I've just realized that this class is not really needed, and moved its 
functionality into `SorterFactory`.
b8f1e53fa46b0ec7f9b7baefbb388c29be72f7ee


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140663596
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -127,6 +127,12 @@
private long autoWatermarkInterval = 0;
 
/**
+* The flag determines whether a custom NormalizedKeySorter will be 
dynamically created
+* for underlying data
+*/
+   private boolean codeGenerationForSorterEnabled = false;
--- End diff --

36419beee7938768453d2335daa9d7c29af720c1

I created a Jira for performance tweaking:
https://issues.apache.org/jira/browse/FLINK-7680


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140663603
  
--- Diff: flink-runtime/pom.xml ---
@@ -204,6 +204,18 @@ under the License.
reflections

 
+   
+   org.freemarker
+   freemarker
+   2.3.20
+   
+
+   
+   org.codehaus.janino
+   janino
+   2.7.5
--- End diff --

84cb9a83efdb384d91d4c967b237fc12dca68774


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140663585
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.TemplateException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private TemplateManager templateManager;
+   private HashMap constructorCache;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.templateManager = TemplateManager.getInstance();
+   this.classCompiler = new SimpleCompiler();
+   this.constructorCache = new HashMap<>();
+   }
+
+   /**
+* A method to get a singleton instance
+* or create one if it hasn't been created yet.
+* @return
+*/
+   public static synchronized SorterFactory getInstance() {
+   if (sorterFactory == null){
+   sorterFactory = new SorterFactory();
+   }
+
+   return sorterFactory;
+   }
+
+
+   /**
+* Create a sorter for the given type comparator and
+* assign serializer, comparator and memory to the sorter.
+* @param serializer
+* @param comparator
+* @param memory
+* @return
+*/
+   public  InMemorySorter createSorter(ExecutionConfig config, 
TypeSerializer serializer, 

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140662898
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.Configuration;
+import freemarker.template.Template;
+import freemarker.template.TemplateException;
+import freemarker.template.TemplateExceptionHandler;
+import freemarker.template.Version;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private HashMap constructorCache;
+   private final Template template;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.classCompiler = new SimpleCompiler();
+   
this.classCompiler.setParentClassLoader(this.getClass().getClassLoader());
--- End diff --

Why would `classCompiler` be removed?

The "parent classloader" of a `SimpleCompiler` is just a field in it, and 
`classCompiler.setParentClassLoader` just modifies this field. So I think it 
should be OK to call `classCompiler.setParentClassLoader` again and again with 
different user code classloaders for every job.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread heytitle
Github user heytitle commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140662538
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.Configuration;
+import freemarker.template.Template;
+import freemarker.template.TemplateException;
+import freemarker.template.TemplateExceptionHandler;
+import freemarker.template.Version;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private HashMap constructorCache;
+   private final Template template;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.classCompiler = new SimpleCompiler();
+   
this.classCompiler.setParentClassLoader(this.getClass().getClassLoader());
--- End diff --

Thanks for the comment.  I have a further comment on this

What I understand is that If `SorterFactory` is a singleton object and its 
parent class loader isn't the user one, it might happen that the next job will 
fail to create new code-generated sorters. This is due to the fact that 
`classComplier` is removed after the first job finishes and it is instantiated 
only once when `SorterFactory is created. 

Please correct me if I'm wrong.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140660299
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.Configuration;
+import freemarker.template.Template;
+import freemarker.template.TemplateException;
+import freemarker.template.TemplateExceptionHandler;
+import freemarker.template.Version;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private HashMap constructorCache;
+   private final Template template;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.classCompiler = new SimpleCompiler();
+   
this.classCompiler.setParentClassLoader(this.getClass().getClassLoader());
--- End diff --

I think that the `SorterFactory` class is not instantiated by the user code 
classloader, so `this.getClass().getClassLoader()` gets the wrong classloader 
here.

Instead, we should get the user code classloader from the caller of 
`createSorter`.

I'll make this change, and also do the modifications to the cache, as 
discussed above.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140660095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -81,6 +81,7 @@
 */
private SorterFactory() {
this.classCompiler = new SimpleCompiler();
+   
this.classCompiler.setParentClassLoader(this.getClass().getClassLoader());
--- End diff --

I think that the `SorterFactory` class is not instantiated by the user code 
classloader, so `this.getClass().getClassLoader()` gets the wrong classloader 
here.

Instead, we should get the user code classloader from the caller of 
`createSorter`.

I'll make this change, and also do the modifications to the cache, as 
discussed above.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140659933
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.TemplateException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private TemplateManager templateManager;
+   private HashMap constructorCache;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.templateManager = TemplateManager.getInstance();
+   this.classCompiler = new SimpleCompiler();
--- End diff --

Thanks for raising this issue. Actually it's a bit more complicated than 
just setting the parent classloader, because of the caching of the generated 
classes. The problem is that the `constructorCache` would keep the old user 
code classloader alive. I'm currently thinking of the following solution:
1. include also the user code classloader in the key of the cache, and
2. use `WeakReference` for both the values and keys of the cache.

This will ensure that we don't try to reuse generated classes from a 
previous job, since the classloader will be different across jobs, so we will 
have different keys. And the cache won't keep anything alive, since it will 
only have `WeakReferences`.
(For 2., I will use the Guava `CacheBuilder`.)


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-24 Thread heytitle
Github user heytitle commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140659544
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
 ---
@@ -190,6 +190,7 @@ private String getSystemOutput(String[] args) throws 
Exception {
switch (mode) {
case CLUSTER:
case COLLECTION:
+   case CLUSTER_WITH_CODEGENERATION_ENABLED:
--- End diff --

First of all, I'm not sure whether this is a good way to get this 
estimation. 

I estimated the build time by running all tests from `flink-gelly-examples` 
inside IntelliJ IDEA.

With `CLUSTER_WITH_CODEGENERATION_ENABLED` : `2m 20s`
Without `CLUSTER_WITH_CODEGENERATION_ENABLED`: `1m 27s`

Patch for disabling `CLUSTER_WITH_CODEGENERATION_ENABLED` case : 
https://gist.github.com/heytitle/89961fcaabcf326eadee190b9d6085a6



---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-21 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140258895
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 ---
@@ -309,7 +309,7 @@ public boolean write(T record) throws IOException {
//   Access Utilities
// 


-   private long readPointer(int logicalPosition) {
+   protected long readPointer(int logicalPosition) {
--- End diff --

`protected final` with a comment why is it so should be ok.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-21 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140258675
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 ---
@@ -47,61 +47,61 @@

public static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8;

-   private static final int MIN_REQUIRED_BUFFERS = 3;
+   public static final int MIN_REQUIRED_BUFFERS = 3;

-   private static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024;
+   public static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024;

-   private static final long LARGE_RECORD_TAG = 1L << 63;
+   public static final long LARGE_RECORD_TAG = 1L << 63;

-   private static final long POINTER_MASK = LARGE_RECORD_TAG - 1;
+   public static final long POINTER_MASK = LARGE_RECORD_TAG - 1;
 
--- End diff --

Maybe put that into the comment inside the code?


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139923767
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.TemplateException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private TemplateManager templateManager;
+   private HashMap constructorCache;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.templateManager = TemplateManager.getInstance();
+   this.classCompiler = new SimpleCompiler();
+   this.constructorCache = new HashMap<>();
+   }
+
+   /**
+* A method to get a singleton instance
+* or create one if it hasn't been created yet.
+* @return
+*/
+   public static synchronized SorterFactory getInstance() {
+   if (sorterFactory == null){
+   sorterFactory = new SorterFactory();
+   }
+
+   return sorterFactory;
+   }
+
+
+   /**
+* Create a sorter for the given type comparator and
+* assign serializer, comparator and memory to the sorter.
+* @param serializer
+* @param comparator
+* @param memory
+* @return
+*/
+   public  InMemorySorter createSorter(ExecutionConfig config, 
TypeSerializer serializer, 

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139929372
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterTemplateModel.java
 ---
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link SorterTemplateModel} is a class that implements code generation 
logic for a given
+ * {@link TypeComparator}.
+ *
+ * The swap and compare methods in {@link NormalizedKeySorter} work on 
a sequence of bytes.
+ * We speed up these operations by splitting this sequence of bytes into 
chunks that can
+ * be handled by primitive operations such as Integer and Long 
operations.
+ */
+class SorterTemplateModel {
+
+   // 

+   //   Constants
+   // 

+
+   static final String TEMPLATE_NAME = "sorter.ftlh";
+
+   /** We don't split to chunks above this size. */
+   private static final int SPLITTING_THRESHOLD = 32;
+
+   /**
+* POSSIBLE_CHUNK_SIZES must be in descending order,
+* because methods that using it are using greedy approach.
+*/
+   private static final Integer[] POSSIBLE_CHUNK_SIZES = {8, 4, 2, 1};
+
+   /** Mapping from chunk sizes to primitive operators. */
+   private static final HashMap byteOperatorMapping = new 
HashMap(){
+   {
+   put(8, "Long");
+   put(4, "Int");
+   put(2, "Short");
+   put(1, "Byte");
+   }
+   };
+
+   // 

+   //   Attributes
+   // 

+
+   private final TypeComparator typeComparator;
+
+   /**
+* Sizes of the chunks. Empty, if we are not splitting to chunks. (See 
calculateChunks())
+*/
+   private final ArrayList primitiveChunks;
+
+   private final String sorterName;
+
+   /**
+* Shows whether the order of records can be completely determined by 
the normalized
+* sorting key, or the sorter has to also deserialize records if their 
keys are equal to
+* really confirm the order.
+*/
+   private final boolean normalizedKeyFullyDetermines;
+
+   /**
+* Constructor.
+* @param typeComparator
+*The type information of underlying data
+*/
+   SorterTemplateModel(TypeComparator typeComparator){
+   this.typeComparator = typeComparator;
+
+   // number of bytes of the sorting key
+   int numKeyBytes;
+
+   // compute no. bytes for sorting records and check whether 
these bytes are just a prefix or not.
+   if (this.typeComparator.supportsNormalizedKey()) {
+   // compute the max normalized key length
+   int numPartialKeys;
+   try {
+   numPartialKeys = 
this.typeComparator.getFlatComparators().length;
+   } catch (Throwable t) {
+   numPartialKeys = 1;
+   }
+
+   int maxLen = 
Math.min(NormalizedKeySorter.DEFAULT_MAX_NORMALIZED_KEY_LEN, 
NormalizedKeySorter.MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys);
+
+   numKeyBytes = 

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139918960
  
--- Diff: flink-runtime/pom.xml ---
@@ -204,6 +204,18 @@ under the License.
reflections

 
+   
+   org.freemarker
+   freemarker
+   2.3.20
+   
+
+   
+   org.codehaus.janino
+   janino
+   2.7.5
--- End diff --

use same version as `flink-table` -> 3.0.7
I think it would make sense to add a variable to the root pom to ensure all 
submodules use the same version.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139911414
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/TemplateManager.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import freemarker.template.Configuration;
+import freemarker.template.Template;
+import freemarker.template.TemplateException;
+import freemarker.template.TemplateExceptionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.Map;
+
+/**
+ * {@link TemplateManager} is a singleton class that provides template 
rendering functionalities for code generation.
+ * Such functionalities are caching, writing generated code to a file.
--- End diff --

Is the functionality to write code to a file still included?


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139917275
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -127,6 +127,12 @@
private long autoWatermarkInterval = 0;
 
/**
+* The flag determines whether a custom NormalizedKeySorter will be 
dynamically created
+* for underlying data
+*/
+   private boolean codeGenerationForSorterEnabled = false;
--- End diff --

Add documentation for this flag to 
`https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/execution_configuration.html`.
 

Since the flag will be hidden in a larger list of options, it might make 
sense to mention it in the DataSet API documentation more prominently. Maybe we 
can restructure the docs a bit and add a section about performance tweaking 
that would include semantic annotations, optimizer hints (incl. hash combine), 
sorter code gen, and object reusage. This could be done as a follow up issue, 
IMO.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139927199
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.TemplateException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private TemplateManager templateManager;
+   private HashMap constructorCache;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.templateManager = TemplateManager.getInstance();
+   this.classCompiler = new SimpleCompiler();
+   this.constructorCache = new HashMap<>();
+   }
+
+   /**
+* A method to get a singleton instance
+* or create one if it hasn't been created yet.
+* @return
+*/
+   public static synchronized SorterFactory getInstance() {
+   if (sorterFactory == null){
+   sorterFactory = new SorterFactory();
+   }
+
+   return sorterFactory;
+   }
+
+
+   /**
+* Create a sorter for the given type comparator and
+* assign serializer, comparator and memory to the sorter.
+* @param serializer
+* @param comparator
+* @param memory
+* @return
+*/
+   public  InMemorySorter createSorter(ExecutionConfig config, 
TypeSerializer serializer, 

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139906497
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
 ---
@@ -190,6 +190,7 @@ private String getSystemOutput(String[] args) throws 
Exception {
switch (mode) {
case CLUSTER:
case COLLECTION:
+   case CLUSTER_WITH_CODEGENERATION_ENABLED:
--- End diff --

I think it is good to enable this here but just for curiosity, how much 
does this increase the build time for `flink-gelly`?


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139920526
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 ---
@@ -309,7 +309,7 @@ public boolean write(T record) throws IOException {
//   Access Utilities
// 


-   private long readPointer(int logicalPosition) {
+   protected long readPointer(int logicalPosition) {
--- End diff --

Changing a method from `private` to `protected` affects how it can be 
inlined. See for example this blog post: 
http://normanmaurer.me/blog/2014/05/15/Inline-all-the-Things (the difference of 
`private` and `public` is discussed at the end of the post).

If it is just about making them accessible (and not overriding them) in the 
code-gen'd sorter, declaring them `final` might solve the issue. @StephanEwen 
and @pnowojski might have some additional advice here.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139926078
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.TemplateException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private TemplateManager templateManager;
+   private HashMap constructorCache;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.templateManager = TemplateManager.getInstance();
+   this.classCompiler = new SimpleCompiler();
+   this.constructorCache = new HashMap<>();
+   }
+
+   /**
+* A method to get a singleton instance
+* or create one if it hasn't been created yet.
+* @return
+*/
+   public static synchronized SorterFactory getInstance() {
+   if (sorterFactory == null){
+   sorterFactory = new SorterFactory();
+   }
+
+   return sorterFactory;
+   }
+
+
+   /**
+* Create a sorter for the given type comparator and
+* assign serializer, comparator and memory to the sorter.
+* @param serializer
+* @param comparator
+* @param memory
+* @return
+*/
+   public  InMemorySorter createSorter(ExecutionConfig config, 
TypeSerializer serializer, 

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139928885
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterTemplateModel.java
 ---
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link SorterTemplateModel} is a class that implements code generation 
logic for a given
+ * {@link TypeComparator}.
+ *
+ * The swap and compare methods in {@link NormalizedKeySorter} work on 
a sequence of bytes.
+ * We speed up these operations by splitting this sequence of bytes into 
chunks that can
+ * be handled by primitive operations such as Integer and Long 
operations.
+ */
+class SorterTemplateModel {
+
+   // 

+   //   Constants
+   // 

+
+   static final String TEMPLATE_NAME = "sorter.ftlh";
+
+   /** We don't split to chunks above this size. */
+   private static final int SPLITTING_THRESHOLD = 32;
+
+   /**
+* POSSIBLE_CHUNK_SIZES must be in descending order,
+* because methods that using it are using greedy approach.
+*/
+   private static final Integer[] POSSIBLE_CHUNK_SIZES = {8, 4, 2, 1};
+
+   /** Mapping from chunk sizes to primitive operators. */
+   private static final HashMap byteOperatorMapping = new 
HashMap(){
+   {
+   put(8, "Long");
+   put(4, "Int");
+   put(2, "Short");
+   put(1, "Byte");
+   }
+   };
+
+   // 

+   //   Attributes
+   // 

+
+   private final TypeComparator typeComparator;
+
+   /**
+* Sizes of the chunks. Empty, if we are not splitting to chunks. (See 
calculateChunks())
+*/
+   private final ArrayList primitiveChunks;
+
+   private final String sorterName;
+
+   /**
+* Shows whether the order of records can be completely determined by 
the normalized
+* sorting key, or the sorter has to also deserialize records if their 
keys are equal to
+* really confirm the order.
+*/
+   private final boolean normalizedKeyFullyDetermines;
+
+   /**
+* Constructor.
+* @param typeComparator
+*The type information of underlying data
+*/
+   SorterTemplateModel(TypeComparator typeComparator){
+   this.typeComparator = typeComparator;
+
+   // number of bytes of the sorting key
+   int numKeyBytes;
+
+   // compute no. bytes for sorting records and check whether 
these bytes are just a prefix or not.
+   if (this.typeComparator.supportsNormalizedKey()) {
+   // compute the max normalized key length
+   int numPartialKeys;
+   try {
+   numPartialKeys = 
this.typeComparator.getFlatComparators().length;
+   } catch (Throwable t) {
+   numPartialKeys = 1;
+   }
+
+   int maxLen = 
Math.min(NormalizedKeySorter.DEFAULT_MAX_NORMALIZED_KEY_LEN, 
NormalizedKeySorter.MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys);
+
+   numKeyBytes = 

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139910959
  
--- Diff: flink-runtime/pom.xml ---
@@ -204,6 +204,18 @@ under the License.
reflections

 
+   
+   org.freemarker
--- End diff --

use latest version?
```

org.freemarker
freemarker
2.3.26-incubating

```


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139933797
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 ---
@@ -146,6 +151,7 @@ public static void teardown() throws Exception {
public static Collection executionModes() {
return Arrays.asList(
new Object[] { TestExecutionMode.CLUSTER },
+   new Object[] { 
TestExecutionMode.CLUSTER_WITH_CODEGENERATION_ENABLED },
--- End diff --

Does this add another execution mode to each tests class that extends 
`MultipleProgramsTestBase`? This would have a big impact on the overall build 
time, right? 

I think it would be better to manually, include add this execution mode to 
a set of selected tests that include sorting.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139926965
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/codegeneration/SorterFactory.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.codegeneration;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+
+import freemarker.template.TemplateException;
+
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link SorterFactory} is a singleton class that provides 
functionalities to create the most suitable sorter
+ * for underlying data based on {@link TypeComparator}.
+ * Note: the generated code can be inspected by configuring Janino to 
write the code that is being compiled
+ * to a file, see http://janino-compiler.github.io/janino/#debugging
+ */
+public class SorterFactory {
+   // 

+   //   Constants
+   // 

+   private static final Logger LOG = 
LoggerFactory.getLogger(SorterFactory.class);
+
+   /** Fixed length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+   // 

+   //   Singleton Attribute
+   // 

+   private static SorterFactory sorterFactory;
+
+   // 

+   //   Attributes
+   // 

+   private SimpleCompiler classCompiler;
+   private TemplateManager templateManager;
+   private HashMap constructorCache;
+
+   /**
+* This is only for testing. If an error occurs, we want to fail the 
test, instead of falling back
+* to a non-generated sorter.
+*/
+   public boolean forceCodeGeneration = false;
+
+   /**
+* Constructor.
+*/
+   private SorterFactory() {
+   this.templateManager = TemplateManager.getInstance();
+   this.classCompiler = new SimpleCompiler();
--- End diff --

we should set the classloader of the compiler as the usercode classloader 
to ensure that the generated classes are cleaned up when the job terminates.
`this.classCompiler.setParentClassLoader(cl)`


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-15 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139186699
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 ---
@@ -115,6 +115,13 @@ public Configuration getConfiguration() {
return tmpDirectories;
}
 
+   public String getFirstTmpDirectory(){
--- End diff --

Just now starting to catch up but the use of the temporary file was the 
only issue I had when using the original PR. Great to see it's already fixed!


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-14 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r138905728
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
 ---
@@ -218,7 +221,7 @@ public long addRecord(T record) throws IOException {
return offset;
}

-   public MutableObjectIterator 
finishWriteAndSortKeys(List memory) throws IOException {
+   public MutableObjectIterator 
finishWriteAndSortKeys(List memory) throws IOException, 
IllegalAccessException, TemplateException, InstantiationException, 
CompileException, InvocationTargetException, NoSuchMethodException, 
ClassNotFoundException {
--- End diff --

Or, maybe even better to catch all of these in the sorter factory method 
(`SorterFactory.createSorter`), and create a non-codegen sorter when any of 
these exceptions happen (and log a warning).


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-14 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r138904204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 ---
@@ -115,6 +115,13 @@ public Configuration getConfiguration() {
return tmpDirectories;
}
 
+   public String getFirstTmpDirectory(){
--- End diff --

We can remove this method, since it became unused when we changed to 
generating the code into just a string instead of a temporary file.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-14 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r138899420
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
 ---
@@ -218,7 +221,7 @@ public long addRecord(T record) throws IOException {
return offset;
}

-   public MutableObjectIterator 
finishWriteAndSortKeys(List memory) throws IOException {
+   public MutableObjectIterator 
finishWriteAndSortKeys(List memory) throws IOException, 
IllegalAccessException, TemplateException, InstantiationException, 
CompileException, InvocationTargetException, NoSuchMethodException, 
ClassNotFoundException {
--- End diff --

I'm thinking that we should encapsulate all these exceptions into one 
`SorterCodegenException` (or just `CodegenException`) and throw only that from 
the sorter factory method, to avoid having to declare this litany of exceptions 
(this same list occurs at multiple places).


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-05-06 Thread heytitle
Github user heytitle commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r115130267
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 ---
@@ -47,61 +47,61 @@

public static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8;

-   private static final int MIN_REQUIRED_BUFFERS = 3;
+   public static final int MIN_REQUIRED_BUFFERS = 3;

-   private static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024;
+   public static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024;

-   private static final long LARGE_RECORD_TAG = 1L << 63;
+   public static final long LARGE_RECORD_TAG = 1L << 63;

-   private static final long POINTER_MASK = LARGE_RECORD_TAG - 1;
+   public static final long POINTER_MASK = LARGE_RECORD_TAG - 1;
 
--- End diff --

The reason `public` is used here because `Janino` first check accessibility 
of these variables and it seems not able to access them when `protected` is 
used and it throws the error below.
```
org.codehaus.commons.compiler.CompileException: Field 
"LARGE_RECORD_THRESHOLD" is not accessible

at 
org.codehaus.janino.ReflectionIClass$ReflectionIField.getConstantValue(ReflectionIClass.java:340)
at 
org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:4433)
at org.codehaus.janino.UnitCompiler.access$1(UnitCompiler.java:182)
at 
org.codehaus.janino.UnitCompiler$11.visitFieldAccess(UnitCompiler.java:4407)
at org.codehaus.janino.Java$FieldAccess.accept(Java.java:3229)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---