[jira] [Created] (FLINK-12116) Args autocast will cause exception for plan transformation in TableAPI

2019-04-04 Thread Xingcan Cui (JIRA)
Xingcan Cui created FLINK-12116:
---

 Summary: Args autocast will cause exception for plan 
transformation in TableAPI
 Key: FLINK-12116
 URL: https://issues.apache.org/jira/browse/FLINK-12116
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.7.2, 1.6.4
Reporter: Xingcan Cui


In tableAPI, the automatic typecast for arguments may break their initial 
structures, which makes {{TreeNode.makeCopy()}} fail.

Take the {{ConcatWs}} function as an example. It requires a string 
{{Expression}} sequence for the second parameter of its constructor. If we 
provide some {{Expressions}} with other types, the planner will try to cast 
them automatically. However, during this process, the arguments will be 
incorrectly unwrapped (e.g., {{[f1, f2]}} will be unwrapped to two expressions 
{{f1.cast(String)}} and {{f2.cast(String)}}) which will cause 
{{java.lang.IllegalArgumentException: wrong number of arguments}}.

As a workaround, we can cast these arguments manually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on issue #7987: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null

2019-04-04 Thread GitBox
klion26 commented on issue #7987: [FLINK-11820][serialization] 
SimpleStringSchema handle message record which value is null
URL: https://github.com/apache/flink/pull/7987#issuecomment-480127016
 
 
   thanks for you contribution, LGTM now


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #7978: [FLINK-11910] [Yarn] add customizable yarn application type

2019-04-04 Thread GitBox
flinkbot edited a comment on issue #7978: [FLINK-11910] [Yarn] add customizable 
yarn application type
URL: https://github.com/apache/flink/pull/7978#issuecomment-472607121
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @rmetzger [PMC], @suez1224 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @rmetzger [PMC]
   * ❗ 3. Needs [attention] from.
   - Needs attention by @gjl
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @suez1224 [committer]
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] suez1224 commented on issue #7978: [FLINK-11910] [Yarn] add customizable yarn application type

2019-04-04 Thread GitBox
suez1224 commented on issue #7978: [FLINK-11910] [Yarn] add customizable yarn 
application type
URL: https://github.com/apache/flink/pull/7978#issuecomment-480102074
 
 
   @flinkbot approve architecture


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type

2019-04-04 Thread GitBox
suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add 
customizable yarn application type
URL: https://github.com/apache/flink/pull/7978#discussion_r272401341
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ##
 @@ -988,7 +992,7 @@ public ApplicationReport startAppMaster(
final String customApplicationName = customName != null ? 
customName : applicationName;
 
appContext.setApplicationName(customApplicationName);
-   appContext.setApplicationType("Apache Flink");
+   appContext.setApplicationType(applicationType.isEmpty() ? 
"Apache Flink" : applicationType);
 
 Review comment:
   Can't we just do the following?
 
appContext.setApplicationType(dynProperties.getOrDefault("application-type",  
"Apache Flink"));


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type

2019-04-04 Thread GitBox
suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add 
customizable yarn application type
URL: https://github.com/apache/flink/pull/7978#discussion_r272401356
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ##
 @@ -138,6 +138,8 @@
private String zookeeperNamespace;
 
private String nodeLabel;
+   /* This field can be override by dynamic property application-type */
+   private String applicationType;
 
 Review comment:
   We don't need this instance variable. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type

2019-04-04 Thread GitBox
suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add 
customizable yarn application type
URL: https://github.com/apache/flink/pull/7978#discussion_r272401378
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ##
 @@ -468,6 +470,8 @@ private void 
validateClusterSpecification(ClusterSpecification clusterSpecificat
flinkConfiguration.setString(dynProperty.getKey(), 
dynProperty.getValue());
}
 
+   this.applicationType = 
dynProperties.getOrDefault("application-type", "");
 
 Review comment:
   We dont need this line as well..


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12115) Add support in Flink to persist state in Azure's blob store

2019-04-04 Thread Piyush Narang (JIRA)
Piyush Narang created FLINK-12115:
-

 Summary: Add support in Flink to persist state in Azure's blob 
store
 Key: FLINK-12115
 URL: https://issues.apache.org/jira/browse/FLINK-12115
 Project: Flink
  Issue Type: New Feature
  Components: FileSystems
Reporter: Piyush Narang


The existing set of flink-filesystems include filesystems like S3 / HDFS / MapR 
etc. For folks using Azure, it would be nice to include support for Azure's 
blob store as a filesystem as well. This would enable us to use Azure blob 
store to store state / checkpoints for running Flink jobs. 
Support for Azure's filesystem is part of the hadoop project in the 
[hadoop-azure|https://github.com/apache/hadoop/tree/trunk/hadoop-tools/hadoop-azure]
 module. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11914) Expose a REST endpoint in JobManager to kill specific TaskManager

2019-04-04 Thread Shuyi Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16810193#comment-16810193
 ] 

Shuyi Chen commented on FLINK-11914:


Hi [~feng.xu], I dont think we should expose an Akka endpoint because Akka is 
an internal implementation detail, and AFAIK, the community is trying to 
deprecate the use of Akka in Flink. Thanks.

> Expose a REST endpoint in JobManager to kill specific TaskManager
> -
>
> Key: FLINK-11914
> URL: https://issues.apache.org/jira/browse/FLINK-11914
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / REST
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> we want to add capability in the Flink web UI to kill each individual TM by 
> clicking a button, this would require first exposing the capability from the 
> REST API endpoint. The reason is that  some TM might be running on a heavily 
> loaded YARN host over time, and we want to kill just that TM and have flink 
> JM to reallocate a TM to restart the job graph. The other approach would be 
> restart the entire YARN job and this is heavy-weight.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12114) Try to perform UDF By Reflection

2019-04-04 Thread dong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dong updated FLINK-12114:
-
Affects Version/s: (was: 1.7.2)
   1.6.5

> Try to perform UDF By Reflection
> 
>
> Key: FLINK-12114
> URL: https://issues.apache.org/jira/browse/FLINK-12114
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.6.5
>Reporter: dong
>Priority: Major
>
> Hi Team:
>      I recently worked on a Flink SQL-based project and then one of the 
> requirements was to dynamically execute the Flink SQL UDF by loading a 
> user-defined UDF. I first used cglib to dynamically add the eval method to 
> the ScalarFunction and then dynamically create the ScalarFunction instance. 
> And call the user-defined UDF by reflection on the line.
> this my code
> {code:java}
> package com.ximalaya.flink.dsl.stream.udf
> import java.lang.reflect.Method
> import com.ximalaya.flink.dsl.stream.`type`.FieldType
> import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, 
> UserUdfContext}
> import net.sf.cglib.core.Signature
> import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, 
> MethodProxy}
> import org.apache.flink.table.functions.ScalarFunction
> import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils
> import net.sf.cglib.asm.Type
> import scala.collection.mutable.\{Map ⇒ IMap}
> /**
>  *
>  * @author martin.dong
>  *
>  **/
> private class UserUdfFunction extends ScalarFunction{
>  override def isDeterministic: Boolean = false
> }
> private class UdfMethodInterceptor(val name:String,
>  val fullyQualifiedClassName:String) extends MethodInterceptor with 
> Serializable {
>  private var userUdf: AbstractUserUdf = _
>  private var evalMethods:IMap[MethodSignature,Method]=IMap()
>  private var closeMethod:Method = _
>  private var openMethod:Method = _
>  override def intercept(o: scala.Any,
>  method: Method,
>  objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = {
>  val methodName=method.getName
>  methodName match {
>  case "open"⇒
>  this.userUdf = 
> Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf]
>  this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval").
>  foreach(method ⇒ 
> evalMethods.put(MethodSignature.createMethodSignature(method), method))
>  this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close")
>  this.openMethod = 
> classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext])
>  openMethod.invoke(userUdf,null)
>  case "eval"⇒
>  val methodSignature = MethodSignature.createMethodSignature(method)
>  evalMethods(methodSignature).invoke(userUdf,objects:_*)
>  case "close"⇒
>  closeMethod.invoke(userUdf)
>  case _⇒
>  methodProxy.invokeSuper(o,objects)
>  }
>  }
> }
> private class MethodSignature (val fieldTypes:Array[FieldType]){
>  def this(clazzArray:Array[Class[_]]){
>  this(clazzArray.map(clazz⇒FieldType.get(clazz)))
>  }
>  override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum
>  override def equals(obj: scala.Any): Boolean = {
>  if(this.eq(obj.asInstanceOf[AnyRef])){
>  return true
>  }
>  obj match {
>  case _: MethodSignature⇒ 
> ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes)
>  case _ ⇒ false
>  }
>  }
>  override def toString: String =fieldTypes.map(_.toString).mkString(",")
> }
> private object MethodSignature{
>  def createMethodSignature(method:Method):MethodSignature={
>  new MethodSignature(method.getParameterTypes)
>  }
> }
> case class 
> EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]])
> object UserUdfFactory {
>  def 
> createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={
>  val enhancer = new Enhancer
>  enhancer.setSuperclass(classOf[UserUdfFunction])
>  enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName))
>  enhancer.setInterfaces(evalMethods.map(method⇒{
>  val returnType=Type.getType(method.returnType.getClazz)
>  val parameters=method.parameters.map(p⇒Type.getType(p.getClazz))
>  (new Signature("eval",returnType,parameters),method.exceptions)
>  }).map{ case(signature,exceptions)⇒
>  val im = new InterfaceMaker
>  im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray)
>  im.create()
>  }.toArray)
>  enhancer.create().asInstanceOf[ScalarFunction]
>  }
> }
> {code}
> Can be executed in local mode but cannot be executed in yarn mode, the 
> following error will occur
> {code:java}
> Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: 
> Cannot determine simple type name "com"
> at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
> at 
> 

[jira] [Created] (FLINK-12114) Try to perform UDF By Reflection

2019-04-04 Thread dong (JIRA)
dong created FLINK-12114:


 Summary: Try to perform UDF By Reflection
 Key: FLINK-12114
 URL: https://issues.apache.org/jira/browse/FLINK-12114
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.7.2
Reporter: dong


Hi Team:

     I recently worked on a Flink SQL-based project and then one of the 
requirements was to dynamically execute the Flink SQL UDF by loading a 
user-defined UDF. I first used cglib to dynamically add the eval method to the 
ScalarFunction and then dynamically create the ScalarFunction instance. And 
call the user-defined UDF by reflection on the line.

this my code
{code:java}
package com.ximalaya.flink.dsl.stream.udf

import java.lang.reflect.Method

import com.ximalaya.flink.dsl.stream.`type`.FieldType
import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, UserUdfContext}
import net.sf.cglib.core.Signature
import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, 
MethodProxy}
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils
import net.sf.cglib.asm.Type

import scala.collection.mutable.\{Map ⇒ IMap}
/**
 *
 * @author martin.dong
 *
 **/
private class UserUdfFunction extends ScalarFunction{
 override def isDeterministic: Boolean = false
}
private class UdfMethodInterceptor(val name:String,
 val fullyQualifiedClassName:String) extends MethodInterceptor with 
Serializable {

 private var userUdf: AbstractUserUdf = _
 private var evalMethods:IMap[MethodSignature,Method]=IMap()
 private var closeMethod:Method = _
 private var openMethod:Method = _

 override def intercept(o: scala.Any,
 method: Method,
 objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = {
 val methodName=method.getName
 methodName match {
 case "open"⇒
 this.userUdf = 
Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf]
 this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval").
 foreach(method ⇒ 
evalMethods.put(MethodSignature.createMethodSignature(method), method))
 this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close")
 this.openMethod = 
classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext])
 openMethod.invoke(userUdf,null)
 case "eval"⇒
 val methodSignature = MethodSignature.createMethodSignature(method)
 evalMethods(methodSignature).invoke(userUdf,objects:_*)
 case "close"⇒
 closeMethod.invoke(userUdf)
 case _⇒
 methodProxy.invokeSuper(o,objects)
 }
 }
}

private class MethodSignature (val fieldTypes:Array[FieldType]){
 def this(clazzArray:Array[Class[_]]){
 this(clazzArray.map(clazz⇒FieldType.get(clazz)))
 }

 override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum
 override def equals(obj: scala.Any): Boolean = {
 if(this.eq(obj.asInstanceOf[AnyRef])){
 return true
 }
 obj match {
 case _: MethodSignature⇒ 
ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes)
 case _ ⇒ false
 }
 }

 override def toString: String =fieldTypes.map(_.toString).mkString(",")
}
private object MethodSignature{
 def createMethodSignature(method:Method):MethodSignature={
 new MethodSignature(method.getParameterTypes)
 }
}
case class 
EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]])
object UserUdfFactory {
 def 
createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={
 val enhancer = new Enhancer
 enhancer.setSuperclass(classOf[UserUdfFunction])
 enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName))
 enhancer.setInterfaces(evalMethods.map(method⇒{
 val returnType=Type.getType(method.returnType.getClazz)
 val parameters=method.parameters.map(p⇒Type.getType(p.getClazz))
 (new Signature("eval",returnType,parameters),method.exceptions)
 }).map{ case(signature,exceptions)⇒
 val im = new InterfaceMaker
 im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray)
 im.create()
 }.toArray)
 enhancer.create().asInstanceOf[ScalarFunction]
 }
}
{code}
Can be executed in local mode but cannot be executed in yarn mode, the 
following error will occur
{code:java}
Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: 
Cannot determine simple type name "com"
at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
at 

[jira] [Updated] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-04 Thread yankai zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yankai zhang updated FLINK-12113:
-
Description: 
 
{code:java}
interface IS extends Iterator, Serializable { }

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(new IS() {
@Override
public boolean hasNext() {
return false;
}

@Override
public Object next() {
return null;
}
}, Object.class);
{code}
Code piece above throws exception:
{code:java}
org.apache.flink.api.common.InvalidProgramException: The implementation of the 
SourceFunction is not serializable. The object probably contains or references 
non serializable fields.

  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
{code}
And my workaround is wrapping clean around iterator instance, like this:

 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(env.clean(new IS() {
@Override
public boolean hasNext() {
return false;
}

@Override
public Object next() {
return null;
}
}), Object.class);
{code}
 

 

 

  was:
 
{code:java}
interface IS extends Iterator, Serializable { }

StreamExecutionEnvironment
  .getExecutionEnvironment()
  .fromCollection(new IS() {
@Override
public boolean hasNext() {
  return false;
}

@Override
public Object next() {
  return null;
}
  }, Object.class);
{code}
Code piece above throws exception:
{code:java}
org.apache.flink.api.common.InvalidProgramException: The implementation of the 
SourceFunction is not serializable. The object probably contains or references 
non serializable fields.

  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
{code}


> User code passing to fromCollection(Iterator, Class) not cleaned
> 
>
> Key: FLINK-12113
> URL: https://issues.apache.org/jira/browse/FLINK-12113
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
>Reporter: yankai zhang
>Priority: Major
>
>  
> {code:java}
> interface IS extends Iterator, Serializable { }
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }, Object.class);
> {code}
> Code piece above throws exception:
> {code:java}
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
>   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> {code}
> And my workaround is wrapping clean around iterator instance, like this:
>  
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromCollection(env.clean(new IS() {
> @Override
> public boolean hasNext() {
> return false;
> }
> @Override
> public Object next() {
> return null;
> }
> }), Object.class);
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned

2019-04-04 Thread yankai zhang (JIRA)
yankai zhang created FLINK-12113:


 Summary: User code passing to fromCollection(Iterator, Class) not 
cleaned
 Key: FLINK-12113
 URL: https://issues.apache.org/jira/browse/FLINK-12113
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.7.2
Reporter: yankai zhang


 
{code:java}
interface IS extends Iterator, Serializable { }

StreamExecutionEnvironment
  .getExecutionEnvironment()
  .fromCollection(new IS() {
@Override
public boolean hasNext() {
  return false;
}

@Override
public Object next() {
  return null;
}
  }, Object.class);
{code}
Code piece above throws exception:
{code:java}
org.apache.flink.api.common.InvalidProgramException: The implementation of the 
SourceFunction is not serializable. The object probably contains or references 
non serializable fields.

  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8113: [FLINK-12112][tests] Properly print process output

2019-04-04 Thread GitBox
flinkbot commented on issue #8113: [FLINK-12112][tests] Properly print process 
output
URL: https://github.com/apache/flink/pull/8113#issuecomment-479912488
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol opened a new pull request #8113: [FLINK-12112][tests] Properly print process output

2019-04-04 Thread GitBox
zentol opened a new pull request #8113: [FLINK-12112][tests] Properly print 
process output
URL: https://github.com/apache/flink/pull/8113
 
 
   ## What is the purpose of the change
   
   Fixes an issue in the `AbstractTaskManagerProcessFailureRecoveryTest` where 
the test, during the process output dumping,
   a) prints the logs of the first TM 3 times
   b) is very prone to NullPointerExceptions.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12112) AbstractTaskManagerProcessFailureRecoveryTest process output logging does not work properly

2019-04-04 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12112:
---
Labels: pull-request-available  (was: )

> AbstractTaskManagerProcessFailureRecoveryTest process output logging does not 
> work properly
> ---
>
> Key: FLINK-12112
> URL: https://issues.apache.org/jira/browse/FLINK-12112
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>
> The {{AbstractTaskManagerProcessFailureRecoveryTest}} starts multiple 
> taskmanagers and prints their output if the test fails.
> However, due to a recent refactoring the logs of the first TM is printed 
> multiple times instead, and the code is prone to NullPointerExceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12111) TaskManagerProcessFailureBatchRecoveryITCase fails due to removed Slot

2019-04-04 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809889#comment-16809889
 ] 

Chesnay Schepler commented on FLINK-12111:
--

Unfortunately the process output could not be printed due to FLINK-12112.

> TaskManagerProcessFailureBatchRecoveryITCase fails due to removed Slot
> --
>
> Key: FLINK-12111
> URL: https://issues.apache.org/jira/browse/FLINK-12111
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.1
>Reporter: Chesnay Schepler
>Priority: Major
>
> https://travis-ci.org/apache/flink/jobs/515636826
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: 4f32093d9c7554c3de832d20f0a06eb5)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:446)
>   at 
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:210)
>   at 
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:187)
>   at 
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>   at 
> org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerFailure(TaskManagerProcessFailureBatchRecoveryITCase.java:115)
>   at 
> org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest$1.run(AbstractTaskManagerProcessFailureRecoveryTest.java:143)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 10 more
> Caused by: org.apache.flink.util.FlinkException: The assigned slot 
> 786ec47f893240315bb01291aab680ec_1 was removed.
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
>   at 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: 4f32093d9c7554c3de832d20f0a06eb5)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>   at 
> 

[jira] [Created] (FLINK-12112) AbstractTaskManagerProcessFailureRecoveryTest process output logging does not work properly

2019-04-04 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-12112:


 Summary: AbstractTaskManagerProcessFailureRecoveryTest process 
output logging does not work properly
 Key: FLINK-12112
 URL: https://issues.apache.org/jira/browse/FLINK-12112
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination, Tests
Affects Versions: 1.8.0, 1.9.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.9.0, 1.8.1


The {{AbstractTaskManagerProcessFailureRecoveryTest}} starts multiple 
taskmanagers and prints their output if the test fails.

However, due to a recent refactoring the logs of the first TM is printed 
multiple times instead, and the code is prone to NullPointerExceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12111) TaskManagerProcessFailureBatchRecoveryITCase fails due to removed Slot

2019-04-04 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-12111:


 Summary: TaskManagerProcessFailureBatchRecoveryITCase fails due to 
removed Slot
 Key: FLINK-12111
 URL: https://issues.apache.org/jira/browse/FLINK-12111
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.8.1
Reporter: Chesnay Schepler


https://travis-ci.org/apache/flink/jobs/515636826

{code}
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
4f32093d9c7554c3de832d20f0a06eb5)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:446)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:210)
at 
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:187)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at 
org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerFailure(TaskManagerProcessFailureBatchRecoveryITCase.java:115)
at 
org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest$1.run(AbstractTaskManagerProcessFailureRecoveryTest.java:143)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 10 more
Caused by: org.apache.flink.util.FlinkException: The assigned slot 
786ec47f893240315bb01291aab680ec_1 was removed.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
4f32093d9c7554c3de832d20f0a06eb5)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:446)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:210)
at 
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:187)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
  

[jira] [Updated] (FLINK-11897) ExecutionGraphSuspendTest does not wait for all tasks to be submitted

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11897:
-
Summary: ExecutionGraphSuspendTest does not wait for all tasks to be 
submitted  (was: ExecutionGraphSuspendTest does not for all tasks to be 
submitted)

> ExecutionGraphSuspendTest does not wait for all tasks to be submitted
> -
>
> Key: FLINK-11897
> URL: https://issues.apache.org/jira/browse/FLINK-11897
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Reporter: chunpinghe
>Assignee: chunpinghe
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> 11:41:09.042 [INFO] Running 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest 
> 11:41:11.009 [ERROR] Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time 
> elapsed: 1.964 s <<< FAILURE! - in 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest 
> 11:41:11.010 [ERROR] 
> testSuspendedOutOfRunning(org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest)
>  Time elapsed: 0.052 s <<< FAILURE! java.lang.AssertionError: Expected: is 
> <0> but: was <3> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.validateNoInteractions(ExecutionGraphSuspendTest.java:271)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(ExecutionGraphSuspendTest.java:255)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.testSuspendedOutOfRunning(ExecutionGraphSuspendTest.java:110)
>  
> [https://api.travis-ci.org/v3/job/505154324/log.txt|https://api.travis-ci.org/v3/job/505154324/log.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11897) ExecutionGraphSuspendTest does not for all tasks to be submitted

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-11897.

   Resolution: Fixed
Fix Version/s: 1.8.1

master: 8a12e0f63de90375653a030304522ae0600cb3f9
1.8: 9008bea98286dff3718e743f9b78aecf63e85d14 

> ExecutionGraphSuspendTest does not for all tasks to be submitted
> 
>
> Key: FLINK-11897
> URL: https://issues.apache.org/jira/browse/FLINK-11897
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Reporter: chunpinghe
>Assignee: chunpinghe
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> 11:41:09.042 [INFO] Running 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest 
> 11:41:11.009 [ERROR] Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time 
> elapsed: 1.964 s <<< FAILURE! - in 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest 
> 11:41:11.010 [ERROR] 
> testSuspendedOutOfRunning(org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest)
>  Time elapsed: 0.052 s <<< FAILURE! java.lang.AssertionError: Expected: is 
> <0> but: was <3> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.validateNoInteractions(ExecutionGraphSuspendTest.java:271)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(ExecutionGraphSuspendTest.java:255)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.testSuspendedOutOfRunning(ExecutionGraphSuspendTest.java:110)
>  
> [https://api.travis-ci.org/v3/job/505154324/log.txt|https://api.travis-ci.org/v3/job/505154324/log.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11897) ExecutionGraphSuspendTest does not for all tasks to be submitted

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11897:
-
Summary: ExecutionGraphSuspendTest does not for all tasks to be submitted  
(was: ExecutionGraphSuspendTest.testSuspendedOutOfRunning failed )

> ExecutionGraphSuspendTest does not for all tasks to be submitted
> 
>
> Key: FLINK-11897
> URL: https://issues.apache.org/jira/browse/FLINK-11897
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Reporter: chunpinghe
>Assignee: chunpinghe
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> 11:41:09.042 [INFO] Running 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest 
> 11:41:11.009 [ERROR] Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time 
> elapsed: 1.964 s <<< FAILURE! - in 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest 
> 11:41:11.010 [ERROR] 
> testSuspendedOutOfRunning(org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest)
>  Time elapsed: 0.052 s <<< FAILURE! java.lang.AssertionError: Expected: is 
> <0> but: was <3> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.validateNoInteractions(ExecutionGraphSuspendTest.java:271)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(ExecutionGraphSuspendTest.java:255)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.testSuspendedOutOfRunning(ExecutionGraphSuspendTest.java:110)
>  
> [https://api.travis-ci.org/v3/job/505154324/log.txt|https://api.travis-ci.org/v3/job/505154324/log.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol merged pull request #7971: [FLINK-11897][tests] should wait all submitTask methods executed,befo…

2019-04-04 Thread GitBox
zentol merged pull request #7971: [FLINK-11897][tests] should wait all 
submitTask methods executed,befo…
URL: https://github.com/apache/flink/pull/7971
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12107) Use Proxy For DataDog Validation

2019-04-04 Thread Luka Jurukovski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809857#comment-16809857
 ] 

Luka Jurukovski commented on FLINK-12107:
-

> Since this feature is only available in 1.9-SNAPSHOT, are you certain that 
> the version you downloaded actually contains the relevant commits?

I must apologize, you are correct, I did somehow do this. I will close this 
ticket

> Use Proxy For DataDog Validation
> 
>
> Key: FLINK-12107
> URL: https://issues.apache.org/jira/browse/FLINK-12107
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Luka Jurukovski
>Priority: Minor
>
> Recently support for DataDog Metric Proxy was added, however validation for 
> the api keys ignores the use of the proxy. There are circumstances in which 
> proxying is used due to the fact that the service itself is not reachable 
> directly.
> Currently the validation pings datadog using the api keys provided.
> https://app.datadoghq.com/api/v1/validate?api_key=



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12107) Use Proxy For DataDog Validation

2019-04-04 Thread Luka Jurukovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luka Jurukovski closed FLINK-12107.
---
Resolution: Fixed

> Use Proxy For DataDog Validation
> 
>
> Key: FLINK-12107
> URL: https://issues.apache.org/jira/browse/FLINK-12107
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Luka Jurukovski
>Priority: Minor
>
> Recently support for DataDog Metric Proxy was added, however validation for 
> the api keys ignores the use of the proxy. There are circumstances in which 
> proxying is used due to the fact that the service itself is not reachable 
> directly.
> Currently the validation pings datadog using the api keys provided.
> https://app.datadoghq.com/api/v1/validate?api_key=



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12048) ZooKeeperHADispatcherTest failed on Travis

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-12048:


Assignee: Till Rohrmann

> ZooKeeperHADispatcherTest failed on Travis
> --
>
> Key: FLINK-12048
> URL: https://issues.apache.org/jira/browse/FLINK-12048
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> https://travis-ci.org/apache/flink/builds/512077301
> {code}
> 01:14:56.351 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 9.671 s <<< FAILURE! - in 
> org.apache.flink.runtime.dispatcher.ZooKeeperHADispatcherTest
> 01:14:56.364 [ERROR] 
> testStandbyDispatcherJobExecution(org.apache.flink.runtime.dispatcher.ZooKeeperHADispatcherTest)
>   Time elapsed: 1.209 s  <<< ERROR!
> org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: 
> org.apache.flink.runtime.dispatcher.DispatcherException: Could not start the 
> added job d51eeb908f360e44c0a2004e00a6afd2
>   at 
> org.apache.flink.runtime.dispatcher.ZooKeeperHADispatcherTest.teardown(ZooKeeperHADispatcherTest.java:117)
> Caused by: org.apache.flink.runtime.dispatcher.DispatcherException: Could not 
> start the added job d51eeb908f360e44c0a2004e00a6afd2
> Caused by: java.lang.IllegalStateException: Not running. Forgot to call 
> start()?
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10746) Need to replace transfer.sh for Travis log upload because it shuts down

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10746.

Resolution: Later

transfer.sh will continue to work.

> Need to replace transfer.sh for Travis log upload because it shuts down
> ---
>
> Key: FLINK-10746
> URL: https://issues.apache.org/jira/browse/FLINK-10746
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
>
> We need to replace {{transfer.sh}} as the destination for our Travis log 
> upload because it is about to shut down (see https://transfer.sh/).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12107) Use Proxy For DataDog Validation

2019-04-04 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809831#comment-16809831
 ] 

Chesnay Schepler commented on FLINK-12107:
--

I can't reproduce this; I've set up a simple local proxy via CCProxy, pointed 
the DatadogReproter to it, ad it correctly goes through the proxy for the 
validation.

Since this feature is only available in 1.9-SNAPSHOT, are you certain that the 
version you downloaded actually contains the relevant commits?

> Use Proxy For DataDog Validation
> 
>
> Key: FLINK-12107
> URL: https://issues.apache.org/jira/browse/FLINK-12107
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Luka Jurukovski
>Priority: Minor
>
> Recently support for DataDog Metric Proxy was added, however validation for 
> the api keys ignores the use of the proxy. There are circumstances in which 
> proxying is used due to the fact that the service itself is not reachable 
> directly.
> Currently the validation pings datadog using the api keys provided.
> https://app.datadoghq.com/api/v1/validate?api_key=



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on issue #7167: [FLINK-10973] [table] Add support for 
map to table API
URL: https://github.com/apache/flink/pull/7167#issuecomment-47936
 
 
   Thanks for the update @dianfu!
   +1 to merged.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12107) Use Proxy For DataDog Validation

2019-04-04 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809794#comment-16809794
 ] 

Chesnay Schepler commented on FLINK-12107:
--

What about the metric traffic itself?

> Use Proxy For DataDog Validation
> 
>
> Key: FLINK-12107
> URL: https://issues.apache.org/jira/browse/FLINK-12107
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Luka Jurukovski
>Priority: Minor
>
> Recently support for DataDog Metric Proxy was added, however validation for 
> the api keys ignores the use of the proxy. There are circumstances in which 
> proxying is used due to the fact that the service itself is not reachable 
> directly.
> Currently the validation pings datadog using the api keys provided.
> https://app.datadoghq.com/api/v1/validate?api_key=



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12107) Use Proxy For DataDog Validation

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12107:
-
Affects Version/s: (was: 1.8.0)
   1.9.0

> Use Proxy For DataDog Validation
> 
>
> Key: FLINK-12107
> URL: https://issues.apache.org/jira/browse/FLINK-12107
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Luka Jurukovski
>Priority: Minor
>
> Recently support for DataDog Metric Proxy was added, however validation for 
> the api keys ignores the use of the proxy. There are circumstances in which 
> proxying is used due to the fact that the service itself is not reachable 
> directly.
> Currently the validation pings datadog using the api keys provided.
> https://app.datadoghq.com/api/v1/validate?api_key=



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12107) Use Proxy For DataDog Validation

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12107:
-
Component/s: Runtime / Metrics

> Use Proxy For DataDog Validation
> 
>
> Key: FLINK-12107
> URL: https://issues.apache.org/jira/browse/FLINK-12107
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: Luka Jurukovski
>Priority: Minor
>
> Recently support for DataDog Metric Proxy was added, however validation for 
> the api keys ignores the use of the proxy. There are circumstances in which 
> proxying is used due to the fact that the service itself is not reachable 
> directly.
> Currently the validation pings datadog using the api keys provided.
> https://app.datadoghq.com/api/v1/validate?api_key=



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12107) Use Proxy For DataDog Validation

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12107:
-
Affects Version/s: 1.8.0

> Use Proxy For DataDog Validation
> 
>
> Key: FLINK-12107
> URL: https://issues.apache.org/jira/browse/FLINK-12107
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.8.0
>Reporter: Luka Jurukovski
>Priority: Minor
>
> Recently support for DataDog Metric Proxy was added, however validation for 
> the api keys ignores the use of the proxy. There are circumstances in which 
> proxying is used due to the fact that the service itself is not reachable 
> directly.
> Currently the validation pings datadog using the api keys provided.
> https://app.datadoghq.com/api/v1/validate?api_key=



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11603) Port MetricQueryService to RpcEndpoint

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11603:
-
Summary: Port MetricQueryService to RpcEndpoint  (was: Port the 
MetricQueryService to the new RpcEndpoint)

> Port MetricQueryService to RpcEndpoint
> --
>
> Key: FLINK-11603
> URL: https://issues.apache.org/jira/browse/FLINK-11603
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Given that a series TODO mention {{This is a temporary hack until we have 
> ported the MetricQueryService to the new RpcEndpoint}}, I'd like to give it a 
> try to implement the RpcEndpoint version of MetricQueryService.
> Basically, port {{onRecieve}} to 
> 1. {{addMetric(metricName, metric, group)}}
> 2. {{removeMetric(metric)}}
> 3. {{createDump()}}
> And then adjust tests and replace {{metricServiceQueryPath}} with a 
> corresponding {{RpcGateway}}.
> I'd like to learn that if the statement if true --- when we call a 
> Runnable/Callable with runAsync/callAsync, then the Runnable/Callable is 
> running in the main thread of the underlying RPC service, specifically, in 
> the actor thread?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11603) Port MetricQueryService to RpcEndpoint

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-11603.

   Resolution: Fixed
Fix Version/s: 1.9.0

master: 6e8a3f312919f8f088c132b13b727cb2188eacb5

> Port MetricQueryService to RpcEndpoint
> --
>
> Key: FLINK-11603
> URL: https://issues.apache.org/jira/browse/FLINK-11603
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Given that a series TODO mention {{This is a temporary hack until we have 
> ported the MetricQueryService to the new RpcEndpoint}}, I'd like to give it a 
> try to implement the RpcEndpoint version of MetricQueryService.
> Basically, port {{onRecieve}} to 
> 1. {{addMetric(metricName, metric, group)}}
> 2. {{removeMetric(metric)}}
> 3. {{createDump()}}
> And then adjust tests and replace {{metricServiceQueryPath}} with a 
> corresponding {{RpcGateway}}.
> I'd like to learn that if the statement if true --- when we call a 
> Runnable/Callable with runAsync/callAsync, then the Runnable/Callable is 
> running in the main thread of the underlying RPC service, specifically, in 
> the actor thread?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol merged pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-04-04 Thread GitBox
zentol merged pull request #7927: [FLINK-11603][metrics] Port the 
MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #8083: [FLINK-12050][Tests] BlockingShutdownTest fails on Java 9

2019-04-04 Thread GitBox
zentol commented on issue #8083: [FLINK-12050][Tests] BlockingShutdownTest 
fails on Java 9
URL: https://github.com/apache/flink/pull/8083#issuecomment-479872874
 
 
   Interesting, the UNIX version has the field, but the windows version doesn't 
have it. Please access `pid()` via reflection instead since it this is the 
"intended" way to get the process ID.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10575) Remove deprecated ExecutionGraphBuilder.buildGraph method

2019-04-04 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10575.

   Resolution: Fixed
Fix Version/s: 1.9.0

master: cff74d3fe769c4844fc4faeb2ae604778d92f9d6

> Remove deprecated ExecutionGraphBuilder.buildGraph method
> -
>
> Key: FLINK-10575
> URL: https://issues.apache.org/jira/browse/FLINK-10575
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.7.0
>Reporter: JIN SUN
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> ExecutionGraphBuilder is not a public API and we should able to remove 
> deprecated method such as:
> @Deprecated
> public static ExecutionGraph buildGraph
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol merged pull request #7950: [FLINK-10575][coordination] Remove deprecated ExecutionGraphBuilder.buildGraph method

2019-04-04 Thread GitBox
zentol merged pull request #7950: [FLINK-10575][coordination] Remove deprecated 
ExecutionGraphBuilder.buildGraph method
URL: https://github.com/apache/flink/pull/7950
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #7780: [FLINK-11593][tests] Check & port TaskManagerTest to new code base

2019-04-04 Thread GitBox
zentol commented on issue #7780: [FLINK-11593][tests] Check & port 
TaskManagerTest to new code base
URL: https://github.com/apache/flink/pull/7780#issuecomment-479871386
 
 
   went through the `JobSubmissionEnvironment` and made some refactorings. 
Additionally I rebased the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
dianfu commented on issue #7167: [FLINK-10973] [table] Add support for map to 
table API
URL: https://github.com/apache/flink/pull/7167#issuecomment-479862729
 
 
   @sunjincheng121 Thanks a lot for your review. Updated the PR accordingly. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #7780: [FLINK-11593][tests] Check & port TaskManagerTest to new code base

2019-04-04 Thread GitBox
zentol commented on issue #7780: [FLINK-11593][tests] Check & port 
TaskManagerTest to new code base
URL: https://github.com/apache/flink/pull/7780#issuecomment-479857850
 
 
   I went through the tests and made a number of changes.
   Now starting to look into the `JobSubmissionEnvironment`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java

2019-04-04 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809692#comment-16809692
 ] 

Liya Fan commented on FLINK-3273:
-

[~dawidwys] I see. This issue can only be resolved with large refactoring 
effort involving multiple modules. And it is difficult to say what is the right 
time to fix it, because it may break many other pull requests. 

Maybe this issue will no longer exists after many small fixes are checked-in :)

> Remove Scala dependency from flink-streaming-java
> -
>
> Key: FLINK-3273
> URL: https://issues.apache.org/jira/browse/FLINK-3273
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Maximilian Michels
>Priority: Major
>
> {{flink-streaming-java}} depends on Scala through {{flink-clients}}, 
> {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the 
> Scala dependency just like we did for {{flink-java}}. Integration tests and 
> utilities which depend on Scala should be moved to {{flink-tests}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272103567
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
 
 Review comment:
   We should also try to move all options which are used only for network 
environment from `TaskManagerOptions` to new class `NetworkEnvironmentOptions` 
to separate them for different shuffle services. What do you think? At the end, 
we can also move all network related classes into its dedicated package.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272103567
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
 
 Review comment:
   We should also try to move all options which are used only for network 
environment from `TaskManagerOptions` to new class `NetworkEnvironmentOptions` 
to separate them for different shuffle services. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8112: [hotfix][table]Improve column operations doc in tableAPI.

2019-04-04 Thread GitBox
asfgit closed pull request #8112: [hotfix][table]Improve column operations doc 
in tableAPI.
URL: https://github.com/apache/flink/pull/8112
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272102359
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numberOfNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* Calculates the amount of memory used for network buffers inside the 
current JVM instance
+* based on the available heap or the max heap size and the according 
configuration parameters.
+*
+* For containers or when started via scripts, if started with a 
memory limit and set to use
+* off-heap memory, the maximum heap size for the JVM is adjusted 
accordingly and we are able
+* to extract the intended values from this.
+*
+* The following configuration parameters are involved:
+* 
+*  {@link TaskManagerOptions#MANAGED_MEMORY_SIZE},
+*  {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, 
and
+*  {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the 
ones above do not exist)
+* .
+*
+* @param config configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+*
+* @return memory to use for network buffers (in bytes)
+*/
+   @VisibleForTesting
+   public static long calculateNewNetworkBufferMemory(Configuration 
config, long maxJvmHeapMemory) {
+   // The maximum heap memory has been adjusted as in 
TaskManagerServices#calculateHeapSizeMB
+   // and we need to invert these calculations.
+   final long jvmHeapNoNet;
+   final MemoryType memoryType = 
ConfigurationParserUtils.getMemoryType(config);
+   if (memoryType == MemoryType.HEAP) {
+   jvmHeapNoNet = maxJvmHeapMemory;
+   } else if (memoryType == MemoryType.OFF_HEAP) {
+   long configuredMemory = 
ConfigurationParserUtils.getManagedMemorySize(config) << 20; // megabytes to 
bytes
+   if (configuredMemory > 0) {
+   // The maximum heap memory has been adjusted 
according to 

[GitHub] [flink] flinkbot edited a comment on issue #8112: [hotfix][table]Improve column operations doc in tableAPI.

2019-04-04 Thread GitBox
flinkbot edited a comment on issue #8112: [hotfix][table]Improve column 
operations doc in tableAPI.
URL: https://github.com/apache/flink/pull/8112#issuecomment-479800278
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8112: [hotfix][table]Improve column operations doc in tableAPI.

2019-04-04 Thread GitBox
sunjincheng121 commented on issue #8112: [hotfix][table]Improve column 
operations doc in tableAPI.
URL: https://github.com/apache/flink/pull/8112#issuecomment-479829421
 
 
   Thanks for your review @hequn8128 
   @flinkbot approve all
   Merging ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java

2019-04-04 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809678#comment-16809678
 ] 

Dawid Wysakowicz commented on FLINK-3273:
-

[~fan_li_ya] It is a much bigger effort. Right now flink-streaming-java depends 
on flink-runtime (this was probably a bad design decision that we unfortunately 
have to leave with for now) which pulls scala. So we won't be able to make 
flink-streaming-java independent of scala as long as we don't invert that 
dependency, but this is a huge effort.

> Remove Scala dependency from flink-streaming-java
> -
>
> Key: FLINK-3273
> URL: https://issues.apache.org/jira/browse/FLINK-3273
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Maximilian Michels
>Priority: Major
>
> {{flink-streaming-java}} depends on Scala through {{flink-clients}}, 
> {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the 
> Scala dependency just like we did for {{flink-java}}. Integration tests and 
> utilities which depend on Scala should be moved to {{flink-tests}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java

2019-04-04 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809672#comment-16809672
 ] 

Liya Fan commented on FLINK-3273:
-

Hi [~mxm], thanks a lot for the detailed explanation.

To resolve this issue, the scala-dependent stuffs in flink-streaming-java 
should be moved to flink-tests, so that flink-stream-java will be independent 
of Scala, right?

If no one else is working on it, can I take it over? :) 

Thanks.

> Remove Scala dependency from flink-streaming-java
> -
>
> Key: FLINK-3273
> URL: https://issues.apache.org/jira/browse/FLINK-3273
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Maximilian Michels
>Priority: Major
>
> {{flink-streaming-java}} depends on Scala through {{flink-clients}}, 
> {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the 
> Scala dependency just like we did for {{flink-java}}. Integration tests and 
> utilities which depend on Scala should be moved to {{flink-tests}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector

2019-04-04 Thread aitozi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809641#comment-16809641
 ] 

aitozi commented on FLINK-11912:


Got it [~suez1224] . Thanks for your explanation, I think you are right. +1 for 
this change

> Expose per partition Kafka lag metric in Flink Kafka connector
> --
>
> Key: FLINK-11912
> URL: https://issues.apache.org/jira/browse/FLINK-11912
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4, 1.7.2
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> In production, it's important that we expose the Kafka lag by partition 
> metric in order for users to diagnose which Kafka partition is lagging. 
> However, although the Kafka lag by partition metrics are available in 
> KafkaConsumer after 0.10.2,  Flink was not able to properly register it 
> because the metrics are only available after the consumer start polling data 
> from partitions. I would suggest the following fix:
> 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet.
> 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add 
> MetricName for those partitions that we want to register into 
> manualRegisteredMetricSet. 
> 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, 
> try to search for the metrics available in KafkaConsumer, and if found, 
> register it and remove the entry from manualRegisteredMetricSet. 
> The overhead of the above approach is bounded and only incur when discovering 
> new partitions, and registration is done once the KafkaConsumer have the 
> metrics exposed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java

2019-04-04 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809634#comment-16809634
 ] 

Maximilian Michels commented on FLINK-3273:
---

The idea was to have a Scala-free version of {{flink-streaming-java}}, 
currently we have {{flink-streaming-java_2.11}} and 
{{flink-streaming-java_2.12}}. We would require a runtime-decoupled 
abstraction, similar to what {{flink-java}} has but for streaming. Then we 
could get rid of the Scala dependency.

Not sure if there are plans at the moment to do that because it would be an 
invasive change.

> Remove Scala dependency from flink-streaming-java
> -
>
> Key: FLINK-3273
> URL: https://issues.apache.org/jira/browse/FLINK-3273
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Maximilian Michels
>Priority: Major
>
> {{flink-streaming-java}} depends on Scala through {{flink-clients}}, 
> {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the 
> Scala dependency just like we did for {{flink-java}}. Integration tests and 
> utilities which depend on Scala should be moved to {{flink-tests}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272080191
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ##
 @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
// subtract the Java memory used for network buffers (always 
off-heap)
-   final long networkBufMB =
-   calculateNetworkBufferMemory(
-   totalJavaMemorySizeMB << 20, // megabytes to 
bytes
-   config) >> 20; // bytes to megabytes
+   final long networkBufMB = 
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
 
 Review comment:
   I think so, I created 
[FLINK-12110](https://issues.apache.org/jira/browse/FLINK-12110) to track it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272080191
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ##
 @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
// subtract the Java memory used for network buffers (always 
off-heap)
-   final long networkBufMB =
-   calculateNetworkBufferMemory(
-   totalJavaMemorySizeMB << 20, // megabytes to 
bytes
-   config) >> 20; // bytes to megabytes
+   final long networkBufMB = 
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
 
 Review comment:
   I think so, I created FLINK-12110 to track it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12110) Introduce API point for ShuffleService to get estimation of locally required memory

2019-04-04 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-12110:
---

 Summary: Introduce API point for ShuffleService to get estimation 
of locally required memory
 Key: FLINK-12110
 URL: https://issues.apache.org/jira/browse/FLINK-12110
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Andrey Zagrebin
Assignee: zhijiang


At the moment, we use static method TaskManagerServices.calculateHeapSizeMB to 
estimate local memory needed in TM container for network environment. As 
network environment becomes pluggable shuffle service, we should also make this 
request from container to shuffle service pluggable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] hequn8128 removed a comment on issue #8112: [hotfix][table]Improve column operations doc in tableAPI.

2019-04-04 Thread GitBox
hequn8128 removed a comment on issue #8112: [hotfix][table]Improve column 
operations doc in tableAPI.
URL: https://github.com/apache/flink/pull/8112#issuecomment-479810010
 
 
   @sunjincheng121 Thanks for improving the document. It is clearer to separate 
the columns operation from the other ones. It's easier for users to find the 
difference between table and SQL when looking at the outline of the documents.
   
   +1 to merge.
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on issue #8112: [hotfix][table]Improve column operations doc in tableAPI.

2019-04-04 Thread GitBox
hequn8128 commented on issue #8112: [hotfix][table]Improve column operations 
doc in tableAPI.
URL: https://github.com/apache/flink/pull/8112#issuecomment-479810010
 
 
   @sunjincheng121 Thanks for improving the document. It is clearer to separate 
the columns operation from the other ones. It's easier for users to find the 
difference between table and SQL when looking at the outline of the documents.
   
   +1 to merge.
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272077394
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   As I understand `NetworkEnvironmentConfigurationBuilder.nettyConfig` is 
`null` by default.
   Then maybe `nettyConfig` could be some created default non-null object and 
`NetworkEnvironmentConfigurationBuilder` could also have 
`setDefaultNettyConfigAndIsCreditBased(boolean isCreditBased)` instead of 
`setIsCreditBased`? Internally it would construct default non-null 
`nettyConfig` with requested 

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272066212
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
 
 Review comment:
   True, then we should probably use `@SuppressWarnings("deprecation")` in this 
case (now it should already go to `calculateNumberOfNetworkBuffers`) because 
the method `fromConfiguration` itself does not look deprecated .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8112: [hotfix][table]Improve column operations doc in tableAPI.

2019-04-04 Thread GitBox
flinkbot commented on issue #8112: [hotfix][table]Improve column operations doc 
in tableAPI.
URL: https://github.com/apache/flink/pull/8112#issuecomment-479800278
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12109) Flink SQL hangs in StreamTableEnvironment.sqlUpdate, finally lead to java.lang.StackOverflowError

2019-04-04 Thread xu (JIRA)
xu created FLINK-12109:
--

 Summary: Flink SQL hangs in StreamTableEnvironment.sqlUpdate, 
finally lead to java.lang.StackOverflowError
 Key: FLINK-12109
 URL: https://issues.apache.org/jira/browse/FLINK-12109
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.7.2
Reporter: xu


Hi Experts,
There is a Flink application(Version 1.7.2) which is written in Flink SQL, and 
the SQL in the application is quite long, consists of about 30 tables, 1500 
lines in total. When executing, I found it is hanged in 
StreamTableEnvironment.sqlUpdate, keep executing some code about calcite, 
finally java.lang.StackOverflowError is got. The following is the stack trace:

Caused by: java.lang.StackOverflowError at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getDistinctRowCount(RelMetadataQuery.java:771)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:207)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:133)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdUtil.estimateFilteredRows(RelMdUtil.java:718)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:124)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:133)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdUtil.getJoinRowCount(RelMdUtil.java:674) 
at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:188)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:133)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72)
 at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at 
GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225)
 at 
org.apache.calcite.rel.metadata.RelMdUtil.getJoinRowCount(RelMdUtil.java:674) 
at 

[GitHub] [flink] sunjincheng121 opened a new pull request #8112: [hotfix][table]Improve column operations doc in tableAPI.

2019-04-04 Thread GitBox
sunjincheng121 opened a new pull request #8112: [hotfix][table]Improve column 
operations doc in tableAPI.
URL: https://github.com/apache/flink/pull/8112
 
 
   ## What is the purpose of the change
   
   Move the document of 
AddColumns/AddOrReplaceColumns/DropColumns/RenameColumns to a separate menu, 
similar to Set Operations.
   
   
   ## Brief change log
 - Move the document of 
AddColumns/AddOrReplaceColumns/DropColumns/RenameColumns to a separate menu
   
   ## Verifying this change
   This change is only a document improvement.
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272066212
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
 
 Review comment:
   True, then we should probably use `@SuppressWarnings("deprecation")` in this 
case because the method `fromConfiguration` itself does not look deprecated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8107: 
[FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink 
batch
URL: https://github.com/apache/flink/pull/8107#discussion_r272061016
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/SortMergeJoinOperator.java
 ##
 @@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.GeneratedNormalizedKeyComputer;
+import org.apache.flink.table.generated.GeneratedProjection;
+import org.apache.flink.table.generated.GeneratedRecordComparator;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.generated.Projection;
+import org.apache.flink.table.generated.RecordComparator;
+import org.apache.flink.table.runtime.TableStreamOperator;
+import org.apache.flink.table.runtime.sort.BinaryExternalSorter;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.table.typeutils.AbstractRowSerializer;
+import org.apache.flink.table.typeutils.BinaryRowSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation that realizes the joining through a sort-merge join 
strategy.
+ * 1.In most cases, its performance is weaker than HashJoin.
+ * 2.It is more stable than HashJoin, and most of the data can be sorted 
stably.
+ * 3.SortMergeJoin should be the best choice if sort can be omitted in the 
case of multi-level join
+ * cascade with the same key.
+ *
+ * NOTE: SEMI and ANTI join output input1 instead of input2. (Contrary to 
{@link HashJoinOperator}).
+ */
+public class SortMergeJoinOperator extends TableStreamOperator
+   implements TwoInputStreamOperator {
+
+   private final long reservedSortMemory1;
+   private final long reservedSortMemory2;
+   private final long externalBufferMemory;
+   private final SortMergeJoinType type;
+   private final boolean leftIsSmaller;
+   private final boolean[] filterNulls;
+
+   // generated code to cook
+   private GeneratedJoinCondition condFuncCode;
+   private GeneratedProjection projectionCode1;
+   private GeneratedProjection projectionCode2;
+   private GeneratedNormalizedKeyComputer computer1;
+   private GeneratedRecordComparator comparator1;
+   private GeneratedNormalizedKeyComputer computer2;
+   private GeneratedRecordComparator comparator2;
+   private GeneratedRecordComparator genKeyComparator;
+
+   private transient MemoryManager memManager;
+   private transient IOManager ioManager;
+   private transient BinaryRowSerializer serializer1;
+   private transient BinaryRowSerializer serializer2;
+   private transient BinaryExternalSorter sorter1;
+   private transient BinaryExternalSorter sorter2;
+   private transient SortMergeJoinIterator joinIterator1;
 
 Review comment:
   yes... I can move them to local variable and use `try final` to ensure them 
closed.


This is an automated message from the Apache 

[GitHub] [flink] JingsongLi commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8107: 
[FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink 
batch
URL: https://github.com/apache/flink/pull/8107#discussion_r272060274
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/SortMergeFullOuterJoinIterator.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.generated.Projection;
+import org.apache.flink.table.generated.RecordComparator;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+import org.apache.flink.table.typeutils.BinaryRowSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Gets two matched rows for full outer join.
+ */
+public class SortMergeFullOuterJoinIterator implements Closeable {
 
 Review comment:
   Yes, but because of the different concepts of names on both sides. 
(probeRow <=> row1), prefer keep them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272042362
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/MapTest.scala
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.{Func1, Func23, Func24}
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, 
unaryNode}
+import org.junit.Test
+
+class MapTest extends TableTestBase {
 
 Review comment:
   How about moving those test case into `CalcTest`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272048201
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/OperationTreeBuilder.scala
 ##
 @@ -369,6 +369,14 @@ class OperationTreeBuilder(private val tableEnv: 
TableEnvironment) {
 Union(left.asInstanceOf[LogicalNode], right.asInstanceOf[LogicalNode], 
all).validate(tableEnv)
   }
 
+  def map(mapFunction: Expression, child: TableOperation): TableOperation = {
+val childNode = child.asInstanceOf[LogicalNode]
+val expandedFields = expandProjectList(
+  Seq(mapFunction).map(expressionBridge.bridge)
+.map(Flattening), childNode, tableEnv)
 
 Review comment:
   `.map(expressionBridge.bridge) .map(Flattening)` 
   to
`.map(e => Flattening(expressionBridge.bridge(e)))`
   I am not pretty which one is better? just a suggestion. :)
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272046074
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ##
 @@ -136,4 +136,21 @@ object ProjectionTranslator {
   case e: PlannerExpression => e
 }
   }
+
+
+  /**
+* Extracts the leading non-alias expression.
+*
+* @param expr  the expression to extract
 
 Review comment:
   Please remove spaces. i.e.: "the expression to extract" ->"the 
expression to extract"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272042200
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/MapValidationTest.scala
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
+import org.apache.flink.table.utils.{TableFunc0, TableTestBase}
+import org.junit.Test
+
+class MapValidationTest extends TableTestBase {
 
 Review comment:
   How about moving those test case into `CalcValidationTest`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272038390
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/MapITCase.scala
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.{Func1, Func23, Func24, Func25}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.test.util.AbstractTestBase
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.{Ignore, Test}
+
+import scala.collection.mutable
+
+class MapITCase extends AbstractTestBase {
+
+  @Test
+  def testSimpleMap(): Unit = {
 
 Review comment:
   I suggest reduce the ITCase count, so can merge 
`TestSimpleMap/testScalarResult/testMultiMap` in one case? And add the alias 
test as well. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272055289
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1682,6 +1682,34 @@ The `OverWindow` defines a range of rows over which 
aggregates are computed. `Ov
 
 {% top %}
 
+### Map
 
 Review comment:
   How about adding a `Row-based operations` contains 
`map/flatmap/aggregate/flatAggregate`. ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272045373
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##
 @@ -462,6 +463,23 @@ class TableImpl(
 wrap(operationTreeBuilder.dropColumns(fields, operationTree))
   }
 
+  override def map(mapFunction: String): Table = {
+map(ExpressionParser.parseExpression(mapFunction))
+  }
+
+  override def map(mapFunction: Expression): Table = {
+val resolvedMapFunction = callResolver.visit(mapFunction)
+getLeadingNonAliasExpr(resolvedMapFunction) match {
+  case callExpr: CallExpression if callExpr.getFunctionDefinition.getType 
==
+FunctionDefinition.Type.SCALAR_FUNCTION =>
+
+  case _ =>
+throw new ValidationException("Only ScalarFunction can be used in 
map.")
 
 Review comment:
   `Only ScalarFunction can be used in the map operator` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272041929
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/MapITCase.scala
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.{Func1, Func23, Func24, Func25}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.test.util.AbstractTestBase
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.{Ignore, Test}
+
+import scala.collection.mutable
+
+class MapITCase extends AbstractTestBase {
 
 Review comment:
   Can we move this test case into `CalcITCase`? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272045455
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##
 @@ -462,6 +463,23 @@ class TableImpl(
 wrap(operationTreeBuilder.dropColumns(fields, operationTree))
   }
 
+  override def map(mapFunction: String): Table = {
+map(ExpressionParser.parseExpression(mapFunction))
+  }
+
+  override def map(mapFunction: Expression): Table = {
+val resolvedMapFunction = callResolver.visit(mapFunction)
+getLeadingNonAliasExpr(resolvedMapFunction) match {
+  case callExpr: CallExpression if callExpr.getFunctionDefinition.getType 
==
+FunctionDefinition.Type.SCALAR_FUNCTION =>
+
 
 Review comment:
   It's better to remove the empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272044872
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##
 @@ -462,6 +463,23 @@ class TableImpl(
 wrap(operationTreeBuilder.dropColumns(fields, operationTree))
   }
 
+  override def map(mapFunction: String): Table = {
+map(ExpressionParser.parseExpression(mapFunction))
+  }
+
+  override def map(mapFunction: Expression): Table = {
+val resolvedMapFunction = callResolver.visit(mapFunction)
 
 Review comment:
   How about using `mapFunction.accept(callResolver)`, just to use the 
callResolver in the TableImpl class is very uniform, and I am fine if you keep 
using `callResolver.visit(mapFunction)` in this PR. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272046322
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ##
 @@ -136,4 +136,21 @@ object ProjectionTranslator {
   case e: PlannerExpression => e
 }
   }
+
+
+  /**
+* Extracts the leading non-alias expression.
+*
+* @param expr  the expression to extract
+* @return the top non-alias expression
+*/
+  def getLeadingNonAliasExpr(expr: Expression): Expression = {
+expr match {
+  case callExpr: CallExpression if callExpr.getFunctionDefinition.equals(
+  BuiltInFunctionDefinitions.AS) =>
+getLeadingNonAliasExpr(callExpr.getChildren.get(0))
+
 
 Review comment:
   It's better to remove the empty line, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API

2019-04-04 Thread GitBox
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] 
[table] Add support for map to table API
URL: https://github.com/apache/flink/pull/7167#discussion_r272037699
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/MapITCase.scala
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.{Func1, Func23, Func24, Func25}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.test.util.AbstractTestBase
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.{Ignore, Test}
+
+import scala.collection.mutable
+
+class MapITCase extends AbstractTestBase {
+
+  @Test
+  def testSimpleMap(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
 
 Review comment:
   Please using `StreamTableEnvironment.create(env)` instead of 
`TableEnvironment.getTableEnvironment(env)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken commented on issue #7987: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null

2019-04-04 Thread GitBox
lamber-ken commented on issue #7987: [FLINK-11820][serialization] 
SimpleStringSchema handle message record which value is null
URL: https://github.com/apache/flink/pull/7987#issuecomment-479787896
 
 
   @klion26,cc


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java

2019-04-04 Thread Liya Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809581#comment-16809581
 ] 

Liya Fan commented on FLINK-3273:
-

Hi [~dawidwys], thank you so much for your feedback.

So what is the purpose of adding a Scala suffix? Does that action resolve this 
issue? 

> Remove Scala dependency from flink-streaming-java
> -
>
> Key: FLINK-3273
> URL: https://issues.apache.org/jira/browse/FLINK-3273
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Maximilian Michels
>Priority: Major
>
> {{flink-streaming-java}} depends on Scala through {{flink-clients}}, 
> {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the 
> Scala dependency just like we did for {{flink-java}}. Integration tests and 
> utilities which depend on Scala should be moved to {{flink-tests}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272034292
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.Serializable;
+
+/**
+ * comparing boundary values.
+ */
+public interface BoundComparator extends Serializable {
+
+   /**
+* reset the comparator.
+*/
+   void reset();
+
+   /**
+* Compares its two row.  Returns a negative integer,
+* zero, or a positive integer as the first argument is less than, equal
+* to, or greater than the second.
+*
+* @param inputRow the first row to be compared.
+* @param inputIndex   the index for the first row.
+* @param currentRow   the second row to be compared.
+* @param currentIndex the index for the second row.
+* @return a negative integer, zero, or a positive integer as the
+* first argument is less than, equal to, or greater than the
+* second.
+*/
+   long compare(BaseRow inputRow, int inputIndex, BaseRow currentRow, int 
currentIndex);
 
 Review comment:
   index is the number of current partition(window).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on issue #8102: [FLINK-12087][table-runtime-blink] 
Introduce over window operators to blink batch
URL: https://github.com/apache/flink/pull/8102#issuecomment-479776062
 
 
   Thanks @KurtYoung for reviewing, I changed method name and add comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch

2019-04-04 Thread GitBox
KurtYoung commented on a change in pull request #8107: 
[FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink 
batch
URL: https://github.com/apache/flink/pull/8107#discussion_r272039981
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/SortMergeFullOuterJoinIterator.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.generated.Projection;
+import org.apache.flink.table.generated.RecordComparator;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+import org.apache.flink.table.typeutils.BinaryRowSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Gets two matched rows for full outer join.
+ */
+public class SortMergeFullOuterJoinIterator implements Closeable {
 
 Review comment:
   Can this also extends from `SortMergeJoinIterator`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch

2019-04-04 Thread GitBox
KurtYoung commented on a change in pull request #8107: 
[FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink 
batch
URL: https://github.com/apache/flink/pull/8107#discussion_r272038683
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/SortMergeJoinOperator.java
 ##
 @@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.GeneratedNormalizedKeyComputer;
+import org.apache.flink.table.generated.GeneratedProjection;
+import org.apache.flink.table.generated.GeneratedRecordComparator;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.generated.Projection;
+import org.apache.flink.table.generated.RecordComparator;
+import org.apache.flink.table.runtime.TableStreamOperator;
+import org.apache.flink.table.runtime.sort.BinaryExternalSorter;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.table.typeutils.AbstractRowSerializer;
+import org.apache.flink.table.typeutils.BinaryRowSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation that realizes the joining through a sort-merge join 
strategy.
+ * 1.In most cases, its performance is weaker than HashJoin.
+ * 2.It is more stable than HashJoin, and most of the data can be sorted 
stably.
+ * 3.SortMergeJoin should be the best choice if sort can be omitted in the 
case of multi-level join
+ * cascade with the same key.
+ *
+ * NOTE: SEMI and ANTI join output input1 instead of input2. (Contrary to 
{@link HashJoinOperator}).
+ */
+public class SortMergeJoinOperator extends TableStreamOperator
+   implements TwoInputStreamOperator {
+
+   private final long reservedSortMemory1;
+   private final long reservedSortMemory2;
+   private final long externalBufferMemory;
+   private final SortMergeJoinType type;
+   private final boolean leftIsSmaller;
+   private final boolean[] filterNulls;
+
+   // generated code to cook
+   private GeneratedJoinCondition condFuncCode;
+   private GeneratedProjection projectionCode1;
+   private GeneratedProjection projectionCode2;
+   private GeneratedNormalizedKeyComputer computer1;
+   private GeneratedRecordComparator comparator1;
+   private GeneratedNormalizedKeyComputer computer2;
+   private GeneratedRecordComparator comparator2;
+   private GeneratedRecordComparator genKeyComparator;
+
+   private transient MemoryManager memManager;
+   private transient IOManager ioManager;
+   private transient BinaryRowSerializer serializer1;
+   private transient BinaryRowSerializer serializer2;
+   private transient BinaryExternalSorter sorter1;
+   private transient BinaryExternalSorter sorter2;
+   private transient SortMergeJoinIterator joinIterator1;
 
 Review comment:
   you cannot list all variables you *might* used, and choose which one to use 
in different scenarios...


This is an automated message 

[GitHub] [flink] KurtYoung commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch

2019-04-04 Thread GitBox
KurtYoung commented on a change in pull request #8107: 
[FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink 
batch
URL: https://github.com/apache/flink/pull/8107#discussion_r272036764
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/SortMergeJoinHelper.java
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+import org.apache.flink.util.Collector;
+
+import java.util.BitSet;
+
+/**
+ * Helper class for sort merge join operators. Contains some join methods.
+ */
+public class SortMergeJoinHelper {
 
 Review comment:
   I think we don't need this class, it's just a mix of various join function, 
you can place them directly into SMJ operator


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12108) Simplify splitting expressions into projections, aggregations & windowProperties

2019-04-04 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-12108:


 Summary: Simplify splitting expressions into projections, 
aggregations & windowProperties
 Key: FLINK-12108
 URL: https://issues.apache.org/jira/browse/FLINK-12108
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272038501
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+
+import java.io.Serializable;
+
+/**
+ * A window frame calculates the results for those records belong to a window 
frame.
+ * Before use a frame must be prepared by passing it all the records in the 
current partition.
+ */
+public interface OverWindowFrame extends Serializable {
+
+   void open(ExecutionContext ctx) throws Exception;
+
+   void resetBuffer(ResettableExternalBuffer rows) throws Exception;
+
+   //return the ACC of the window frame.
+   BaseRow write(int index, BaseRow current) throws Exception;
 
 Review comment:
   Method `write` need modify to `process`.
   
   Over AGG means that every Row has a corresponding output.
   OverWindowFrame is called by:
   1. Get all data and invoke `prepare(ResettableExternalBuffer rows)` for 
partition
   2. Then each Row is traversed one by one to invoke `BaseRow process(int 
index, BaseRow current)`to get the calculation results of the currentRow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272038501
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+
+import java.io.Serializable;
+
+/**
+ * A window frame calculates the results for those records belong to a window 
frame.
+ * Before use a frame must be prepared by passing it all the records in the 
current partition.
+ */
+public interface OverWindowFrame extends Serializable {
+
+   void open(ExecutionContext ctx) throws Exception;
+
+   void resetBuffer(ResettableExternalBuffer rows) throws Exception;
+
+   //return the ACC of the window frame.
+   BaseRow write(int index, BaseRow current) throws Exception;
 
 Review comment:
   Method `write` need modify to `process`.
   
   Over AGG means that every Row has a corresponding output.
   OverWindowFrame is called by:
   1. Get all data and invoke `prepare(Resettable External Buffer rows)` for 
partition
   2. Then each Row is traversed one by one to invoke `BaseRow process(int 
index, BaseRow current)`to get the calculation results of the currentRow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-3273) Remove Scala dependency from flink-streaming-java

2019-04-04 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809565#comment-16809565
 ] 

Dawid Wysakowicz edited comment on FLINK-3273 at 4/4/19 6:54 AM:
-

[~fan_li_ya] "Adding a Scala suffix" means that the full artifactId is 
{{flink-streaming-java_\{scalaversion-suffix\}}}. Rather than just 
{{flink-streaming-java}}.


was (Author: dawidwys):
[~fan_li_ya] "Adding a Scala suffix" means that the full artifactId is 
{{flink-streaming-java_{scala-version-suffix}}}. Rather than just 
{{flink-streaming-java}}.

> Remove Scala dependency from flink-streaming-java
> -
>
> Key: FLINK-3273
> URL: https://issues.apache.org/jira/browse/FLINK-3273
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Maximilian Michels
>Priority: Major
>
> {{flink-streaming-java}} depends on Scala through {{flink-clients}}, 
> {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the 
> Scala dependency just like we did for {{flink-java}}. Integration tests and 
> utilities which depend on Scala should be moved to {{flink-tests}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java

2019-04-04 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809565#comment-16809565
 ] 

Dawid Wysakowicz commented on FLINK-3273:
-

[~fan_li_ya] "Adding a Scala suffix" means that the full artifactId is 
{{flink-streaming-java_{scala-version-suffix}}}. Rather than just 
{{flink-streaming-java}}.

> Remove Scala dependency from flink-streaming-java
> -
>
> Key: FLINK-3273
> URL: https://issues.apache.org/jira/browse/FLINK-3273
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Maximilian Michels
>Priority: Major
>
> {{flink-streaming-java}} depends on Scala through {{flink-clients}}, 
> {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the 
> Scala dependency just like we did for {{flink-java}}. Integration tests and 
> utilities which depend on Scala should be moved to {{flink-tests}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12105) TUMBLE INTERVAL value errors out for 100 or more value

2019-04-04 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809561#comment-16809561
 ] 

Dawid Wysakowicz commented on FLINK-12105:
--

I think the design doc for upcoming refactor of Types system in Table API might 
come in handy to understand different types: 
[https://docs.google.com/document/d/1a9HUb6OaBIoj9IRfbILcMFPrOL7ALeZ3rVI66dvA2_U]

It is also true that as of now flink supports only intervals with nanosecond 
precision for window aggregations.

> TUMBLE INTERVAL value errors out for 100 or more value
> --
>
> Key: FLINK-12105
> URL: https://issues.apache.org/jira/browse/FLINK-12105
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.7.2
> Environment: [https://github.com/ververica/sql-training]
>Reporter: Vinod Mehra
>Priority: Major
>
> I ran into this while experimenting with different values at Lyft eng. 
> However it is reproducible with [https://github.com/ververica/sql-training] 
> as well. I showed this issue to the training instructors during 
> flink-forward-19 and they asked me to file this bug.
> The INTERVAL values work fine until 99. Errors after that:
> *TUMBLE(rideTime, INTERVAL '100' SECOND)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of SECOND(2) field_
> *TUMBLE(rideTime, INTERVAL '100' MINUTE)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of MINUTE(2) field_
> *TUMBLE(rideTime, INTERVAL '100' HOUR)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of HOUR(2) field_
> *TUMBLE(rideTime, INTERVAL '100' DAY)*
> _org.apache.calcite.sql.validate.SqlValidatorException:_ Interval field value 
> 100 exceeds precision of DAY(2) field
> (Note: MONTH AND YEAR also error out but for different reasons ("_Only 
> constant window intervals with millisecond resolution are supported_"). MONTH 
> and YEAR intervals are not supported at all currently. I was told that it is 
> hard to implement because of timezone differences. I will file that 
> separately.)_
>  _



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272036049
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+
+import java.io.Serializable;
+
+/**
+ * A window frame calculates the results for those records belong to a window 
frame.
+ * Before use a frame must be prepared by passing it all the records in the 
current partition.
+ */
+public interface OverWindowFrame extends Serializable {
+
+   void open(ExecutionContext ctx) throws Exception;
+
+   void resetBuffer(ResettableExternalBuffer rows) throws Exception;
 
 Review comment:
   Means prepare for next partition(window).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272035934
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+
+import java.io.Serializable;
+
+/**
+ * A window frame calculates the results for those records belong to a window 
frame.
+ * Before use a frame must be prepared by passing it all the records in the 
current partition.
+ */
+public interface OverWindowFrame extends Serializable {
+
+   void open(ExecutionContext ctx) throws Exception;
+
+   void resetBuffer(ResettableExternalBuffer rows) throws Exception;
 
 Review comment:
   Naming `prepare` is more appropriate


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272036049
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate.over.frame;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
+
+import java.io.Serializable;
+
+/**
+ * A window frame calculates the results for those records belong to a window 
frame.
+ * Before use a frame must be prepared by passing it all the records in the 
current partition.
+ */
+public interface OverWindowFrame extends Serializable {
+
+   void open(ExecutionContext ctx) throws Exception;
+
+   void resetBuffer(ResettableExternalBuffer rows) throws Exception;
 
 Review comment:
   Means prepare for next partition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272034292
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.Serializable;
+
+/**
+ * comparing boundary values.
+ */
+public interface BoundComparator extends Serializable {
+
+   /**
+* reset the comparator.
+*/
+   void reset();
+
+   /**
+* Compares its two row.  Returns a negative integer,
+* zero, or a positive integer as the first argument is less than, equal
+* to, or greater than the second.
+*
+* @param inputRow the first row to be compared.
+* @param inputIndex   the index for the first row.
+* @param currentRow   the second row to be compared.
+* @param currentIndex the index for the second row.
+* @return a negative integer, zero, or a positive integer as the
+* first argument is less than, equal to, or greater than the
+* second.
+*/
+   long compare(BaseRow inputRow, int inputIndex, BaseRow currentRow, int 
currentIndex);
 
 Review comment:
   index is the number of current window.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272033859
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.Serializable;
+
+/**
+ * comparing boundary values.
 
 Review comment:
   
https://docs.oracle.com/cd/E17952_01/mysql-8.0-en/window-functions-frames.html
   Above url has explanation, e.g.:
   CURRENT ROW: For ROWS, the bound is the current row. For RANGE, the bound is 
the peers of the current row.
   UNBOUNDED PRECEDING: The bound is the first partition row.
   UNBOUNDED FOLLOWING: The bound is the last partition row.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272033041
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.Serializable;
+
+/**
+ * comparing boundary values.
+ */
+public interface BoundComparator extends Serializable {
+
+   /**
+* reset the comparator.
+*/
+   void reset();
 
 Review comment:
   Because the field getter of currentRow may be cached.
   For example, the `UnboundedFollowingOverWindowFrame`, when calculating its 
tail boundary, may cross multiple Rows in iterator. The 
`BoundComparator.compare` will be invoked many times is the same `currentRow` 
input, so it is not necessary to get its field value every time, so cached its 
field value in `BoundComparator`.
   `reset` is designed to clear the cache of `BoundComparator`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12105) TUMBLE INTERVAL value errors out for 100 or more value

2019-04-04 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809550#comment-16809550
 ] 

Hequn Cheng commented on FLINK-12105:
-

[~vmehra] Hi, thanks for bringing up the discussion. As the default interval 
precision is 2, you need to specify the precision, e.g., {{INTERVAL '100' 
HOUR(3)}}. For MONTH and YEAR, there is already a discussion for it which may 
be helpful for you, see FLINK-9740.

Best, Hequn

> TUMBLE INTERVAL value errors out for 100 or more value
> --
>
> Key: FLINK-12105
> URL: https://issues.apache.org/jira/browse/FLINK-12105
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.7.2
> Environment: [https://github.com/ververica/sql-training]
>Reporter: Vinod Mehra
>Priority: Major
>
> I ran into this while experimenting with different values at Lyft eng. 
> However it is reproducible with [https://github.com/ververica/sql-training] 
> as well. I showed this issue to the training instructors during 
> flink-forward-19 and they asked me to file this bug.
> The INTERVAL values work fine until 99. Errors after that:
> *TUMBLE(rideTime, INTERVAL '100' SECOND)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of SECOND(2) field_
> *TUMBLE(rideTime, INTERVAL '100' MINUTE)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of MINUTE(2) field_
> *TUMBLE(rideTime, INTERVAL '100' HOUR)*
> _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value 
> 100 exceeds precision of HOUR(2) field_
> *TUMBLE(rideTime, INTERVAL '100' DAY)*
> _org.apache.calcite.sql.validate.SqlValidatorException:_ Interval field value 
> 100 exceeds precision of DAY(2) field
> (Note: MONTH AND YEAR also error out but for different reasons ("_Only 
> constant window intervals with millisecond resolution are supported_"). MONTH 
> and YEAR intervals are not supported at all currently. I was told that it is 
> hard to implement because of timezone differences. I will file that 
> separately.)_
>  _



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dawidwys commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces

2019-04-04 Thread GitBox
dawidwys commented on a change in pull request #8050: [FLINK-11067][table] 
Convert TableEnvironments to interfaces
URL: https://github.com/apache/flink/pull/8050#discussion_r272031691
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ExternalCatalog;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptor;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+
+/**
+ * The base class for batch and stream TableEnvironments.
+ *
+ * The TableEnvironment is a central concept of the Table API and SQL 
integration. It is
+ * responsible for:
+ *
+ * 
+ * Registering a Table in the internal catalog
+ * Registering an external catalog
+ * Executing SQL queries
+ * Registering a user-defined (scalar, table, or aggregation) 
function
+ * Converting a DataStream or DataSet into a Table
+ * Holding a reference to an ExecutionEnvironment or 
StreamExecutionEnvironment
+ * 
+ */
+@PublicEvolving
+public interface TableEnvironment {
+
+   /**
+* Creates a table from a table source.
+*
+* @param source table source used as table
+*/
+   Table fromTableSource(TableSource source);
+
+   /**
+* Registers an {@link ExternalCatalog} under a unique name in the 
TableEnvironment's schema.
+* All tables registered in the {@link ExternalCatalog} can be accessed.
+*
+* @param nameThe name under which the externalCatalog will 
be registered
+* @param externalCatalog The externalCatalog to register
+*/
+   void registerExternalCatalog(String name, ExternalCatalog 
externalCatalog);
+
+   /**
+* Gets a registered {@link ExternalCatalog} by name.
+*
+* @param name The name to look up the {@link ExternalCatalog}
+* @return The {@link ExternalCatalog}
+*/
+   ExternalCatalog getRegisteredExternalCatalog(String name);
+
+   /**
+* Registers a {@link ScalarFunction} under a unique name. Replaces 
already existing
+* user-defined functions under this name.
+*/
+   void registerFunction(String name, ScalarFunction function);
+
+   /**
+* Registers a {@link Table} under a unique name in the 
TableEnvironment's catalog.
+* Registered tables can be referenced in SQL queries.
+*
+* @param name The name under which the table will be registered.
+* @param table The table to register.
+*/
+   void registerTable(String name, Table table);
+
+   /**
+* Registers an external {@link TableSource} in this {@link 
TableEnvironment}'s catalog.
+* Registered tables can be referenced in SQL queries.
+*
+* @param nameThe name under which the {@link TableSource} is 
registered.
+* @param tableSource The {@link TableSource} to register.
+*/
+   void registerTableSource(String name, TableSource tableSource);
+
+   /**
+* Registers an external {@link TableSink} with given field names and 
types in this
+* {@link TableEnvironment}'s catalog.
+* Registered sink tables can be referenced in SQL DML statements.
+*
+* @param name The name under which the {@link TableSink} is registered.
+* @param fieldNames The field names to register with the {@link 
TableSink}.
+* @param fieldTypes The field types to register with the {@link 
TableSink}.
+* @param tableSink The {@link TableSink} to register.
+*/
+   void registerTableSink(String name, String[] fieldNames, 
TypeInformation[] fieldTypes, TableSink tableSink);
 
 Review comment:
   That's unfortunate as we are extracting new interface, but I 

[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-04 Thread GitBox
JingsongLi commented on a change in pull request #8102: 
[FLINK-12087][table-runtime-blink] Introduce over window operators to blink 
batch
URL: https://github.com/apache/flink/pull/8102#discussion_r272031198
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.io.Serializable;
+
+/**
+ * comparing boundary values.
+ */
+public interface BoundComparator extends Serializable {
 
 Review comment:
   For example, 
   `SELECT d, e, sum(e) over (partition by d order by e desc range between 5 
PRECEDING and 4 PRECEDING) FROM Table`
   The above SQL is ranged according to the `e` field.
   The previous 5 PRECEDING means that the `e` value of current Row should be 
greater than or equal to the `e` value of range header plus 5.
   So the `BoundComparator` is used to code generate this logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >