[jira] [Created] (FLINK-12116) Args autocast will cause exception for plan transformation in TableAPI
Xingcan Cui created FLINK-12116: --- Summary: Args autocast will cause exception for plan transformation in TableAPI Key: FLINK-12116 URL: https://issues.apache.org/jira/browse/FLINK-12116 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.7.2, 1.6.4 Reporter: Xingcan Cui In tableAPI, the automatic typecast for arguments may break their initial structures, which makes {{TreeNode.makeCopy()}} fail. Take the {{ConcatWs}} function as an example. It requires a string {{Expression}} sequence for the second parameter of its constructor. If we provide some {{Expressions}} with other types, the planner will try to cast them automatically. However, during this process, the arguments will be incorrectly unwrapped (e.g., {{[f1, f2]}} will be unwrapped to two expressions {{f1.cast(String)}} and {{f2.cast(String)}}) which will cause {{java.lang.IllegalArgumentException: wrong number of arguments}}. As a workaround, we can cast these arguments manually. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on issue #7987: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null
klion26 commented on issue #7987: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null URL: https://github.com/apache/flink/pull/7987#issuecomment-480127016 thanks for you contribution, LGTM now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #7978: [FLINK-11910] [Yarn] add customizable yarn application type
flinkbot edited a comment on issue #7978: [FLINK-11910] [Yarn] add customizable yarn application type URL: https://github.com/apache/flink/pull/7978#issuecomment-472607121 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @rmetzger [PMC], @suez1224 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @rmetzger [PMC] * ❗ 3. Needs [attention] from. - Needs attention by @gjl * ✅ 4. The change fits into the overall [architecture]. - Approved by @suez1224 [committer] * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] suez1224 commented on issue #7978: [FLINK-11910] [Yarn] add customizable yarn application type
suez1224 commented on issue #7978: [FLINK-11910] [Yarn] add customizable yarn application type URL: https://github.com/apache/flink/pull/7978#issuecomment-480102074 @flinkbot approve architecture This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type
suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type URL: https://github.com/apache/flink/pull/7978#discussion_r272401341 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ## @@ -988,7 +992,7 @@ public ApplicationReport startAppMaster( final String customApplicationName = customName != null ? customName : applicationName; appContext.setApplicationName(customApplicationName); - appContext.setApplicationType("Apache Flink"); + appContext.setApplicationType(applicationType.isEmpty() ? "Apache Flink" : applicationType); Review comment: Can't we just do the following? appContext.setApplicationType(dynProperties.getOrDefault("application-type", "Apache Flink")); This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type
suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type URL: https://github.com/apache/flink/pull/7978#discussion_r272401356 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ## @@ -138,6 +138,8 @@ private String zookeeperNamespace; private String nodeLabel; + /* This field can be override by dynamic property application-type */ + private String applicationType; Review comment: We don't need this instance variable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type
suez1224 commented on a change in pull request #7978: [FLINK-11910] [Yarn] add customizable yarn application type URL: https://github.com/apache/flink/pull/7978#discussion_r272401378 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ## @@ -468,6 +470,8 @@ private void validateClusterSpecification(ClusterSpecification clusterSpecificat flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); } + this.applicationType = dynProperties.getOrDefault("application-type", ""); Review comment: We dont need this line as well.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12115) Add support in Flink to persist state in Azure's blob store
Piyush Narang created FLINK-12115: - Summary: Add support in Flink to persist state in Azure's blob store Key: FLINK-12115 URL: https://issues.apache.org/jira/browse/FLINK-12115 Project: Flink Issue Type: New Feature Components: FileSystems Reporter: Piyush Narang The existing set of flink-filesystems include filesystems like S3 / HDFS / MapR etc. For folks using Azure, it would be nice to include support for Azure's blob store as a filesystem as well. This would enable us to use Azure blob store to store state / checkpoints for running Flink jobs. Support for Azure's filesystem is part of the hadoop project in the [hadoop-azure|https://github.com/apache/hadoop/tree/trunk/hadoop-tools/hadoop-azure] module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11914) Expose a REST endpoint in JobManager to kill specific TaskManager
[ https://issues.apache.org/jira/browse/FLINK-11914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16810193#comment-16810193 ] Shuyi Chen commented on FLINK-11914: Hi [~feng.xu], I dont think we should expose an Akka endpoint because Akka is an internal implementation detail, and AFAIK, the community is trying to deprecate the use of Akka in Flink. Thanks. > Expose a REST endpoint in JobManager to kill specific TaskManager > - > > Key: FLINK-11914 > URL: https://issues.apache.org/jira/browse/FLINK-11914 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > we want to add capability in the Flink web UI to kill each individual TM by > clicking a button, this would require first exposing the capability from the > REST API endpoint. The reason is that some TM might be running on a heavily > loaded YARN host over time, and we want to kill just that TM and have flink > JM to reallocate a TM to restart the job graph. The other approach would be > restart the entire YARN job and this is heavy-weight. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12114) Try to perform UDF By Reflection
[ https://issues.apache.org/jira/browse/FLINK-12114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dong updated FLINK-12114: - Affects Version/s: (was: 1.7.2) 1.6.5 > Try to perform UDF By Reflection > > > Key: FLINK-12114 > URL: https://issues.apache.org/jira/browse/FLINK-12114 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.6.5 >Reporter: dong >Priority: Major > > Hi Team: > I recently worked on a Flink SQL-based project and then one of the > requirements was to dynamically execute the Flink SQL UDF by loading a > user-defined UDF. I first used cglib to dynamically add the eval method to > the ScalarFunction and then dynamically create the ScalarFunction instance. > And call the user-defined UDF by reflection on the line. > this my code > {code:java} > package com.ximalaya.flink.dsl.stream.udf > import java.lang.reflect.Method > import com.ximalaya.flink.dsl.stream.`type`.FieldType > import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, > UserUdfContext} > import net.sf.cglib.core.Signature > import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, > MethodProxy} > import org.apache.flink.table.functions.ScalarFunction > import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils > import net.sf.cglib.asm.Type > import scala.collection.mutable.\{Map ⇒ IMap} > /** > * > * @author martin.dong > * > **/ > private class UserUdfFunction extends ScalarFunction{ > override def isDeterministic: Boolean = false > } > private class UdfMethodInterceptor(val name:String, > val fullyQualifiedClassName:String) extends MethodInterceptor with > Serializable { > private var userUdf: AbstractUserUdf = _ > private var evalMethods:IMap[MethodSignature,Method]=IMap() > private var closeMethod:Method = _ > private var openMethod:Method = _ > override def intercept(o: scala.Any, > method: Method, > objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = { > val methodName=method.getName > methodName match { > case "open"⇒ > this.userUdf = > Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf] > this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval"). > foreach(method ⇒ > evalMethods.put(MethodSignature.createMethodSignature(method), method)) > this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close") > this.openMethod = > classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext]) > openMethod.invoke(userUdf,null) > case "eval"⇒ > val methodSignature = MethodSignature.createMethodSignature(method) > evalMethods(methodSignature).invoke(userUdf,objects:_*) > case "close"⇒ > closeMethod.invoke(userUdf) > case _⇒ > methodProxy.invokeSuper(o,objects) > } > } > } > private class MethodSignature (val fieldTypes:Array[FieldType]){ > def this(clazzArray:Array[Class[_]]){ > this(clazzArray.map(clazz⇒FieldType.get(clazz))) > } > override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum > override def equals(obj: scala.Any): Boolean = { > if(this.eq(obj.asInstanceOf[AnyRef])){ > return true > } > obj match { > case _: MethodSignature⇒ > ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes) > case _ ⇒ false > } > } > override def toString: String =fieldTypes.map(_.toString).mkString(",") > } > private object MethodSignature{ > def createMethodSignature(method:Method):MethodSignature={ > new MethodSignature(method.getParameterTypes) > } > } > case class > EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]]) > object UserUdfFactory { > def > createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={ > val enhancer = new Enhancer > enhancer.setSuperclass(classOf[UserUdfFunction]) > enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName)) > enhancer.setInterfaces(evalMethods.map(method⇒{ > val returnType=Type.getType(method.returnType.getClazz) > val parameters=method.parameters.map(p⇒Type.getType(p.getClazz)) > (new Signature("eval",returnType,parameters),method.exceptions) > }).map{ case(signature,exceptions)⇒ > val im = new InterfaceMaker > im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray) > im.create() > }.toArray) > enhancer.create().asInstanceOf[ScalarFunction] > } > } > {code} > Can be executed in local mode but cannot be executed in yarn mode, the > following error will occur > {code:java} > Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: > Cannot determine simple type name "com" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at >
[jira] [Created] (FLINK-12114) Try to perform UDF By Reflection
dong created FLINK-12114: Summary: Try to perform UDF By Reflection Key: FLINK-12114 URL: https://issues.apache.org/jira/browse/FLINK-12114 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.7.2 Reporter: dong Hi Team: I recently worked on a Flink SQL-based project and then one of the requirements was to dynamically execute the Flink SQL UDF by loading a user-defined UDF. I first used cglib to dynamically add the eval method to the ScalarFunction and then dynamically create the ScalarFunction instance. And call the user-defined UDF by reflection on the line. this my code {code:java} package com.ximalaya.flink.dsl.stream.udf import java.lang.reflect.Method import com.ximalaya.flink.dsl.stream.`type`.FieldType import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, UserUdfContext} import net.sf.cglib.core.Signature import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, MethodProxy} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils import net.sf.cglib.asm.Type import scala.collection.mutable.\{Map ⇒ IMap} /** * * @author martin.dong * **/ private class UserUdfFunction extends ScalarFunction{ override def isDeterministic: Boolean = false } private class UdfMethodInterceptor(val name:String, val fullyQualifiedClassName:String) extends MethodInterceptor with Serializable { private var userUdf: AbstractUserUdf = _ private var evalMethods:IMap[MethodSignature,Method]=IMap() private var closeMethod:Method = _ private var openMethod:Method = _ override def intercept(o: scala.Any, method: Method, objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = { val methodName=method.getName methodName match { case "open"⇒ this.userUdf = Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf] this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval"). foreach(method ⇒ evalMethods.put(MethodSignature.createMethodSignature(method), method)) this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close") this.openMethod = classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext]) openMethod.invoke(userUdf,null) case "eval"⇒ val methodSignature = MethodSignature.createMethodSignature(method) evalMethods(methodSignature).invoke(userUdf,objects:_*) case "close"⇒ closeMethod.invoke(userUdf) case _⇒ methodProxy.invokeSuper(o,objects) } } } private class MethodSignature (val fieldTypes:Array[FieldType]){ def this(clazzArray:Array[Class[_]]){ this(clazzArray.map(clazz⇒FieldType.get(clazz))) } override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum override def equals(obj: scala.Any): Boolean = { if(this.eq(obj.asInstanceOf[AnyRef])){ return true } obj match { case _: MethodSignature⇒ ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes) case _ ⇒ false } } override def toString: String =fieldTypes.map(_.toString).mkString(",") } private object MethodSignature{ def createMethodSignature(method:Method):MethodSignature={ new MethodSignature(method.getParameterTypes) } } case class EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]]) object UserUdfFactory { def createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={ val enhancer = new Enhancer enhancer.setSuperclass(classOf[UserUdfFunction]) enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName)) enhancer.setInterfaces(evalMethods.map(method⇒{ val returnType=Type.getType(method.returnType.getClazz) val parameters=method.parameters.map(p⇒Type.getType(p.getClazz)) (new Signature("eval",returnType,parameters),method.exceptions) }).map{ case(signature,exceptions)⇒ val im = new InterfaceMaker im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray) im.create() }.toArray) enhancer.create().asInstanceOf[ScalarFunction] } } {code} Can be executed in local mode but cannot be executed in yarn mode, the following error will occur {code:java} Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: Cannot determine simple type name "com" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at
[jira] [Updated] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yankai zhang updated FLINK-12113: - Description: {code:java} interface IS extends Iterator, Serializable { } StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(new IS() { @Override public boolean hasNext() { return false; } @Override public Object next() { return null; } }, Object.class); {code} Code piece above throws exception: {code:java} org.apache.flink.api.common.InvalidProgramException: The implementation of the SourceFunction is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) {code} And my workaround is wrapping clean around iterator instance, like this: {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(env.clean(new IS() { @Override public boolean hasNext() { return false; } @Override public Object next() { return null; } }), Object.class); {code} was: {code:java} interface IS extends Iterator, Serializable { } StreamExecutionEnvironment .getExecutionEnvironment() .fromCollection(new IS() { @Override public boolean hasNext() { return false; } @Override public Object next() { return null; } }, Object.class); {code} Code piece above throws exception: {code:java} org.apache.flink.api.common.InvalidProgramException: The implementation of the SourceFunction is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) {code} > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
yankai zhang created FLINK-12113: Summary: User code passing to fromCollection(Iterator, Class) not cleaned Key: FLINK-12113 URL: https://issues.apache.org/jira/browse/FLINK-12113 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.7.2 Reporter: yankai zhang {code:java} interface IS extends Iterator, Serializable { } StreamExecutionEnvironment .getExecutionEnvironment() .fromCollection(new IS() { @Override public boolean hasNext() { return false; } @Override public Object next() { return null; } }, Object.class); {code} Code piece above throws exception: {code:java} org.apache.flink.api.common.InvalidProgramException: The implementation of the SourceFunction is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8113: [FLINK-12112][tests] Properly print process output
flinkbot commented on issue #8113: [FLINK-12112][tests] Properly print process output URL: https://github.com/apache/flink/pull/8113#issuecomment-479912488 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol opened a new pull request #8113: [FLINK-12112][tests] Properly print process output
zentol opened a new pull request #8113: [FLINK-12112][tests] Properly print process output URL: https://github.com/apache/flink/pull/8113 ## What is the purpose of the change Fixes an issue in the `AbstractTaskManagerProcessFailureRecoveryTest` where the test, during the process output dumping, a) prints the logs of the first TM 3 times b) is very prone to NullPointerExceptions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12112) AbstractTaskManagerProcessFailureRecoveryTest process output logging does not work properly
[ https://issues.apache.org/jira/browse/FLINK-12112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12112: --- Labels: pull-request-available (was: ) > AbstractTaskManagerProcessFailureRecoveryTest process output logging does not > work properly > --- > > Key: FLINK-12112 > URL: https://issues.apache.org/jira/browse/FLINK-12112 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.0, 1.9.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > > The {{AbstractTaskManagerProcessFailureRecoveryTest}} starts multiple > taskmanagers and prints their output if the test fails. > However, due to a recent refactoring the logs of the first TM is printed > multiple times instead, and the code is prone to NullPointerExceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12111) TaskManagerProcessFailureBatchRecoveryITCase fails due to removed Slot
[ https://issues.apache.org/jira/browse/FLINK-12111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809889#comment-16809889 ] Chesnay Schepler commented on FLINK-12111: -- Unfortunately the process output could not be printed due to FLINK-12112. > TaskManagerProcessFailureBatchRecoveryITCase fails due to removed Slot > -- > > Key: FLINK-12111 > URL: https://issues.apache.org/jira/browse/FLINK-12111 > Project: Flink > Issue Type: Bug >Affects Versions: 1.8.1 >Reporter: Chesnay Schepler >Priority: Major > > https://travis-ci.org/apache/flink/jobs/515636826 > {code} > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: 4f32093d9c7554c3de832d20f0a06eb5) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:446) > at > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:210) > at > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:187) > at > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) > at > org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerFailure(TaskManagerProcessFailureBatchRecoveryITCase.java:115) > at > org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest$1.run(AbstractTaskManagerProcessFailureRecoveryTest.java:143) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 10 more > Caused by: org.apache.flink.util.FlinkException: The assigned slot > 786ec47f893240315bb01291aab680ec_1 was removed. > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385) > at > org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847) > at > org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: 4f32093d9c7554c3de832d20f0a06eb5) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > at >
[jira] [Created] (FLINK-12112) AbstractTaskManagerProcessFailureRecoveryTest process output logging does not work properly
Chesnay Schepler created FLINK-12112: Summary: AbstractTaskManagerProcessFailureRecoveryTest process output logging does not work properly Key: FLINK-12112 URL: https://issues.apache.org/jira/browse/FLINK-12112 Project: Flink Issue Type: Bug Components: Runtime / Coordination, Tests Affects Versions: 1.8.0, 1.9.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.9.0, 1.8.1 The {{AbstractTaskManagerProcessFailureRecoveryTest}} starts multiple taskmanagers and prints their output if the test fails. However, due to a recent refactoring the logs of the first TM is printed multiple times instead, and the code is prone to NullPointerExceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12111) TaskManagerProcessFailureBatchRecoveryITCase fails due to removed Slot
Chesnay Schepler created FLINK-12111: Summary: TaskManagerProcessFailureBatchRecoveryITCase fails due to removed Slot Key: FLINK-12111 URL: https://issues.apache.org/jira/browse/FLINK-12111 Project: Flink Issue Type: Bug Affects Versions: 1.8.1 Reporter: Chesnay Schepler https://travis-ci.org/apache/flink/jobs/515636826 {code} org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 4f32093d9c7554c3de832d20f0a06eb5) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:446) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:210) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:187) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) at org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerFailure(TaskManagerProcessFailureBatchRecoveryITCase.java:115) at org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest$1.run(AbstractTaskManagerProcessFailureRecoveryTest.java:143) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) ... 10 more Caused by: org.apache.flink.util.FlinkException: The assigned slot 786ec47f893240315bb01291aab680ec_1 was removed. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385) at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847) at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 4f32093d9c7554c3de832d20f0a06eb5) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:446) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:210) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:187) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
[jira] [Updated] (FLINK-11897) ExecutionGraphSuspendTest does not wait for all tasks to be submitted
[ https://issues.apache.org/jira/browse/FLINK-11897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11897: - Summary: ExecutionGraphSuspendTest does not wait for all tasks to be submitted (was: ExecutionGraphSuspendTest does not for all tasks to be submitted) > ExecutionGraphSuspendTest does not wait for all tasks to be submitted > - > > Key: FLINK-11897 > URL: https://issues.apache.org/jira/browse/FLINK-11897 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Reporter: chunpinghe >Assignee: chunpinghe >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.9.0, 1.8.1 > > Time Spent: 20m > Remaining Estimate: 0h > > 11:41:09.042 [INFO] Running > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest > 11:41:11.009 [ERROR] Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time > elapsed: 1.964 s <<< FAILURE! - in > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest > 11:41:11.010 [ERROR] > testSuspendedOutOfRunning(org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest) > Time elapsed: 0.052 s <<< FAILURE! java.lang.AssertionError: Expected: is > <0> but: was <3> at > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.validateNoInteractions(ExecutionGraphSuspendTest.java:271) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(ExecutionGraphSuspendTest.java:255) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.testSuspendedOutOfRunning(ExecutionGraphSuspendTest.java:110) > > [https://api.travis-ci.org/v3/job/505154324/log.txt|https://api.travis-ci.org/v3/job/505154324/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11897) ExecutionGraphSuspendTest does not for all tasks to be submitted
[ https://issues.apache.org/jira/browse/FLINK-11897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-11897. Resolution: Fixed Fix Version/s: 1.8.1 master: 8a12e0f63de90375653a030304522ae0600cb3f9 1.8: 9008bea98286dff3718e743f9b78aecf63e85d14 > ExecutionGraphSuspendTest does not for all tasks to be submitted > > > Key: FLINK-11897 > URL: https://issues.apache.org/jira/browse/FLINK-11897 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Reporter: chunpinghe >Assignee: chunpinghe >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.9.0, 1.8.1 > > Time Spent: 20m > Remaining Estimate: 0h > > 11:41:09.042 [INFO] Running > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest > 11:41:11.009 [ERROR] Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time > elapsed: 1.964 s <<< FAILURE! - in > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest > 11:41:11.010 [ERROR] > testSuspendedOutOfRunning(org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest) > Time elapsed: 0.052 s <<< FAILURE! java.lang.AssertionError: Expected: is > <0> but: was <3> at > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.validateNoInteractions(ExecutionGraphSuspendTest.java:271) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(ExecutionGraphSuspendTest.java:255) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.testSuspendedOutOfRunning(ExecutionGraphSuspendTest.java:110) > > [https://api.travis-ci.org/v3/job/505154324/log.txt|https://api.travis-ci.org/v3/job/505154324/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11897) ExecutionGraphSuspendTest does not for all tasks to be submitted
[ https://issues.apache.org/jira/browse/FLINK-11897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11897: - Summary: ExecutionGraphSuspendTest does not for all tasks to be submitted (was: ExecutionGraphSuspendTest.testSuspendedOutOfRunning failed ) > ExecutionGraphSuspendTest does not for all tasks to be submitted > > > Key: FLINK-11897 > URL: https://issues.apache.org/jira/browse/FLINK-11897 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Reporter: chunpinghe >Assignee: chunpinghe >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > 11:41:09.042 [INFO] Running > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest > 11:41:11.009 [ERROR] Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time > elapsed: 1.964 s <<< FAILURE! - in > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest > 11:41:11.010 [ERROR] > testSuspendedOutOfRunning(org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest) > Time elapsed: 0.052 s <<< FAILURE! java.lang.AssertionError: Expected: is > <0> but: was <3> at > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.validateNoInteractions(ExecutionGraphSuspendTest.java:271) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(ExecutionGraphSuspendTest.java:255) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphSuspendTest.testSuspendedOutOfRunning(ExecutionGraphSuspendTest.java:110) > > [https://api.travis-ci.org/v3/job/505154324/log.txt|https://api.travis-ci.org/v3/job/505154324/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol merged pull request #7971: [FLINK-11897][tests] should wait all submitTask methods executed,befo…
zentol merged pull request #7971: [FLINK-11897][tests] should wait all submitTask methods executed,befo… URL: https://github.com/apache/flink/pull/7971 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12107) Use Proxy For DataDog Validation
[ https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809857#comment-16809857 ] Luka Jurukovski commented on FLINK-12107: - > Since this feature is only available in 1.9-SNAPSHOT, are you certain that > the version you downloaded actually contains the relevant commits? I must apologize, you are correct, I did somehow do this. I will close this ticket > Use Proxy For DataDog Validation > > > Key: FLINK-12107 > URL: https://issues.apache.org/jira/browse/FLINK-12107 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.9.0 >Reporter: Luka Jurukovski >Priority: Minor > > Recently support for DataDog Metric Proxy was added, however validation for > the api keys ignores the use of the proxy. There are circumstances in which > proxying is used due to the fact that the service itself is not reachable > directly. > Currently the validation pings datadog using the api keys provided. > https://app.datadoghq.com/api/v1/validate?api_key= -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12107) Use Proxy For DataDog Validation
[ https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luka Jurukovski closed FLINK-12107. --- Resolution: Fixed > Use Proxy For DataDog Validation > > > Key: FLINK-12107 > URL: https://issues.apache.org/jira/browse/FLINK-12107 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.9.0 >Reporter: Luka Jurukovski >Priority: Minor > > Recently support for DataDog Metric Proxy was added, however validation for > the api keys ignores the use of the proxy. There are circumstances in which > proxying is used due to the fact that the service itself is not reachable > directly. > Currently the validation pings datadog using the api keys provided. > https://app.datadoghq.com/api/v1/validate?api_key= -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12048) ZooKeeperHADispatcherTest failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-12048: Assignee: Till Rohrmann > ZooKeeperHADispatcherTest failed on Travis > -- > > Key: FLINK-12048 > URL: https://issues.apache.org/jira/browse/FLINK-12048 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.8.0, 1.9.0 >Reporter: Chesnay Schepler >Assignee: Till Rohrmann >Priority: Critical > Labels: test-stability > > https://travis-ci.org/apache/flink/builds/512077301 > {code} > 01:14:56.351 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 9.671 s <<< FAILURE! - in > org.apache.flink.runtime.dispatcher.ZooKeeperHADispatcherTest > 01:14:56.364 [ERROR] > testStandbyDispatcherJobExecution(org.apache.flink.runtime.dispatcher.ZooKeeperHADispatcherTest) > Time elapsed: 1.209 s <<< ERROR! > org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: > org.apache.flink.runtime.dispatcher.DispatcherException: Could not start the > added job d51eeb908f360e44c0a2004e00a6afd2 > at > org.apache.flink.runtime.dispatcher.ZooKeeperHADispatcherTest.teardown(ZooKeeperHADispatcherTest.java:117) > Caused by: org.apache.flink.runtime.dispatcher.DispatcherException: Could not > start the added job d51eeb908f360e44c0a2004e00a6afd2 > Caused by: java.lang.IllegalStateException: Not running. Forgot to call > start()? > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10746) Need to replace transfer.sh for Travis log upload because it shuts down
[ https://issues.apache.org/jira/browse/FLINK-10746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10746. Resolution: Later transfer.sh will continue to work. > Need to replace transfer.sh for Travis log upload because it shuts down > --- > > Key: FLINK-10746 > URL: https://issues.apache.org/jira/browse/FLINK-10746 > Project: Flink > Issue Type: Task > Components: Build System >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > > We need to replace {{transfer.sh}} as the destination for our Travis log > upload because it is about to shut down (see https://transfer.sh/). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12107) Use Proxy For DataDog Validation
[ https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809831#comment-16809831 ] Chesnay Schepler commented on FLINK-12107: -- I can't reproduce this; I've set up a simple local proxy via CCProxy, pointed the DatadogReproter to it, ad it correctly goes through the proxy for the validation. Since this feature is only available in 1.9-SNAPSHOT, are you certain that the version you downloaded actually contains the relevant commits? > Use Proxy For DataDog Validation > > > Key: FLINK-12107 > URL: https://issues.apache.org/jira/browse/FLINK-12107 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.9.0 >Reporter: Luka Jurukovski >Priority: Minor > > Recently support for DataDog Metric Proxy was added, however validation for > the api keys ignores the use of the proxy. There are circumstances in which > proxying is used due to the fact that the service itself is not reachable > directly. > Currently the validation pings datadog using the api keys provided. > https://app.datadoghq.com/api/v1/validate?api_key= -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on issue #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on issue #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#issuecomment-47936 Thanks for the update @dianfu! +1 to merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12107) Use Proxy For DataDog Validation
[ https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809794#comment-16809794 ] Chesnay Schepler commented on FLINK-12107: -- What about the metric traffic itself? > Use Proxy For DataDog Validation > > > Key: FLINK-12107 > URL: https://issues.apache.org/jira/browse/FLINK-12107 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.9.0 >Reporter: Luka Jurukovski >Priority: Minor > > Recently support for DataDog Metric Proxy was added, however validation for > the api keys ignores the use of the proxy. There are circumstances in which > proxying is used due to the fact that the service itself is not reachable > directly. > Currently the validation pings datadog using the api keys provided. > https://app.datadoghq.com/api/v1/validate?api_key= -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12107) Use Proxy For DataDog Validation
[ https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12107: - Affects Version/s: (was: 1.8.0) 1.9.0 > Use Proxy For DataDog Validation > > > Key: FLINK-12107 > URL: https://issues.apache.org/jira/browse/FLINK-12107 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.9.0 >Reporter: Luka Jurukovski >Priority: Minor > > Recently support for DataDog Metric Proxy was added, however validation for > the api keys ignores the use of the proxy. There are circumstances in which > proxying is used due to the fact that the service itself is not reachable > directly. > Currently the validation pings datadog using the api keys provided. > https://app.datadoghq.com/api/v1/validate?api_key= -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12107) Use Proxy For DataDog Validation
[ https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12107: - Component/s: Runtime / Metrics > Use Proxy For DataDog Validation > > > Key: FLINK-12107 > URL: https://issues.apache.org/jira/browse/FLINK-12107 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: Luka Jurukovski >Priority: Minor > > Recently support for DataDog Metric Proxy was added, however validation for > the api keys ignores the use of the proxy. There are circumstances in which > proxying is used due to the fact that the service itself is not reachable > directly. > Currently the validation pings datadog using the api keys provided. > https://app.datadoghq.com/api/v1/validate?api_key= -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12107) Use Proxy For DataDog Validation
[ https://issues.apache.org/jira/browse/FLINK-12107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-12107: - Affects Version/s: 1.8.0 > Use Proxy For DataDog Validation > > > Key: FLINK-12107 > URL: https://issues.apache.org/jira/browse/FLINK-12107 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.8.0 >Reporter: Luka Jurukovski >Priority: Minor > > Recently support for DataDog Metric Proxy was added, however validation for > the api keys ignores the use of the proxy. There are circumstances in which > proxying is used due to the fact that the service itself is not reachable > directly. > Currently the validation pings datadog using the api keys provided. > https://app.datadoghq.com/api/v1/validate?api_key= -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11603) Port MetricQueryService to RpcEndpoint
[ https://issues.apache.org/jira/browse/FLINK-11603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11603: - Summary: Port MetricQueryService to RpcEndpoint (was: Port the MetricQueryService to the new RpcEndpoint) > Port MetricQueryService to RpcEndpoint > -- > > Key: FLINK-11603 > URL: https://issues.apache.org/jira/browse/FLINK-11603 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Given that a series TODO mention {{This is a temporary hack until we have > ported the MetricQueryService to the new RpcEndpoint}}, I'd like to give it a > try to implement the RpcEndpoint version of MetricQueryService. > Basically, port {{onRecieve}} to > 1. {{addMetric(metricName, metric, group)}} > 2. {{removeMetric(metric)}} > 3. {{createDump()}} > And then adjust tests and replace {{metricServiceQueryPath}} with a > corresponding {{RpcGateway}}. > I'd like to learn that if the statement if true --- when we call a > Runnable/Callable with runAsync/callAsync, then the Runnable/Callable is > running in the main thread of the underlying RPC service, specifically, in > the actor thread? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11603) Port MetricQueryService to RpcEndpoint
[ https://issues.apache.org/jira/browse/FLINK-11603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-11603. Resolution: Fixed Fix Version/s: 1.9.0 master: 6e8a3f312919f8f088c132b13b727cb2188eacb5 > Port MetricQueryService to RpcEndpoint > -- > > Key: FLINK-11603 > URL: https://issues.apache.org/jira/browse/FLINK-11603 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Given that a series TODO mention {{This is a temporary hack until we have > ported the MetricQueryService to the new RpcEndpoint}}, I'd like to give it a > try to implement the RpcEndpoint version of MetricQueryService. > Basically, port {{onRecieve}} to > 1. {{addMetric(metricName, metric, group)}} > 2. {{removeMetric(metric)}} > 3. {{createDump()}} > And then adjust tests and replace {{metricServiceQueryPath}} with a > corresponding {{RpcGateway}}. > I'd like to learn that if the statement if true --- when we call a > Runnable/Callable with runAsync/callAsync, then the Runnable/Callable is > running in the main thread of the underlying RPC service, specifically, in > the actor thread? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol merged pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint
zentol merged pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint URL: https://github.com/apache/flink/pull/7927 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on issue #8083: [FLINK-12050][Tests] BlockingShutdownTest fails on Java 9
zentol commented on issue #8083: [FLINK-12050][Tests] BlockingShutdownTest fails on Java 9 URL: https://github.com/apache/flink/pull/8083#issuecomment-479872874 Interesting, the UNIX version has the field, but the windows version doesn't have it. Please access `pid()` via reflection instead since it this is the "intended" way to get the process ID. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10575) Remove deprecated ExecutionGraphBuilder.buildGraph method
[ https://issues.apache.org/jira/browse/FLINK-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10575. Resolution: Fixed Fix Version/s: 1.9.0 master: cff74d3fe769c4844fc4faeb2ae604778d92f9d6 > Remove deprecated ExecutionGraphBuilder.buildGraph method > - > > Key: FLINK-10575 > URL: https://issues.apache.org/jira/browse/FLINK-10575 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.7.0 >Reporter: JIN SUN >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > ExecutionGraphBuilder is not a public API and we should able to remove > deprecated method such as: > @Deprecated > public static ExecutionGraph buildGraph > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol merged pull request #7950: [FLINK-10575][coordination] Remove deprecated ExecutionGraphBuilder.buildGraph method
zentol merged pull request #7950: [FLINK-10575][coordination] Remove deprecated ExecutionGraphBuilder.buildGraph method URL: https://github.com/apache/flink/pull/7950 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on issue #7780: [FLINK-11593][tests] Check & port TaskManagerTest to new code base
zentol commented on issue #7780: [FLINK-11593][tests] Check & port TaskManagerTest to new code base URL: https://github.com/apache/flink/pull/7780#issuecomment-479871386 went through the `JobSubmissionEnvironment` and made some refactorings. Additionally I rebased the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on issue #7167: [FLINK-10973] [table] Add support for map to table API
dianfu commented on issue #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#issuecomment-479862729 @sunjincheng121 Thanks a lot for your review. Updated the PR accordingly. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on issue #7780: [FLINK-11593][tests] Check & port TaskManagerTest to new code base
zentol commented on issue #7780: [FLINK-11593][tests] Check & port TaskManagerTest to new code base URL: https://github.com/apache/flink/pull/7780#issuecomment-479857850 I went through the tests and made a number of changes. Now starting to look into the `JobSubmissionEnvironment`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809692#comment-16809692 ] Liya Fan commented on FLINK-3273: - [~dawidwys] I see. This issue can only be resolved with large refactoring effort involving multiple modules. And it is difficult to say what is the right time to fix it, because it may break many other pull requests. Maybe this issue will no longer exists after many small fixes are checked-in :) > Remove Scala dependency from flink-streaming-java > - > > Key: FLINK-3273 > URL: https://issues.apache.org/jira/browse/FLINK-3273 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Maximilian Michels >Priority: Major > > {{flink-streaming-java}} depends on Scala through {{flink-clients}}, > {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the > Scala dependency just like we did for {{flink-java}}. Integration tests and > utilities which depend on Scala should be moved to {{flink-tests}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272103567 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + final int dataport = getDataport(configuration); + + final int pageSize = getPageSize(configuration); + + final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory); + + final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport); + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); Review comment: We should also try to move all options which are used only for network environment from `TaskManagerOptions` to new class `NetworkEnvironmentOptions` to separate them for different shuffle services. What do you think? At the end, we can also move all network related classes into its dedicated package. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272103567 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + final int dataport = getDataport(configuration); + + final int pageSize = getPageSize(configuration); + + final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory); + + final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport); + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); Review comment: We should also try to move all options which are used only for network environment from `TaskManagerOptions` to new class `NetworkEnvironmentOptions` to separate them for different shuffle services. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8112: [hotfix][table]Improve column operations doc in tableAPI.
asfgit closed pull request #8112: [hotfix][table]Improve column operations doc in tableAPI. URL: https://github.com/apache/flink/pull/8112 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272102359 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + final int dataport = getDataport(configuration); + + final int pageSize = getPageSize(configuration); + + final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory); + + final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport); + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); + + return new NetworkEnvironmentConfiguration( + numberOfNetworkBuffers, + pageSize, + initialRequestBackoff, + maxRequestBackoff, + buffersPerChannel, + extraBuffersPerGate, + isCreditBased, + nettyConfig); + } + + /** +* Calculates the amount of memory used for network buffers inside the current JVM instance +* based on the available heap or the max heap size and the according configuration parameters. +* +* For containers or when started via scripts, if started with a memory limit and set to use +* off-heap memory, the maximum heap size for the JVM is adjusted accordingly and we are able +* to extract the intended values from this. +* +* The following configuration parameters are involved: +* +* {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}, +* {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and +* {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist) +* . +* +* @param config configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* +* @return memory to use for network buffers (in bytes) +*/ + @VisibleForTesting + public static long calculateNewNetworkBufferMemory(Configuration config, long maxJvmHeapMemory) { + // The maximum heap memory has been adjusted as in TaskManagerServices#calculateHeapSizeMB + // and we need to invert these calculations. + final long jvmHeapNoNet; + final MemoryType memoryType = ConfigurationParserUtils.getMemoryType(config); + if (memoryType == MemoryType.HEAP) { + jvmHeapNoNet = maxJvmHeapMemory; + } else if (memoryType == MemoryType.OFF_HEAP) { + long configuredMemory = ConfigurationParserUtils.getManagedMemorySize(config) << 20; // megabytes to bytes + if (configuredMemory > 0) { + // The maximum heap memory has been adjusted according to
[GitHub] [flink] flinkbot edited a comment on issue #8112: [hotfix][table]Improve column operations doc in tableAPI.
flinkbot edited a comment on issue #8112: [hotfix][table]Improve column operations doc in tableAPI. URL: https://github.com/apache/flink/pull/8112#issuecomment-479800278 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ✅ 5. Overall code [quality] is good. - Approved by @sunjincheng121 [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8112: [hotfix][table]Improve column operations doc in tableAPI.
sunjincheng121 commented on issue #8112: [hotfix][table]Improve column operations doc in tableAPI. URL: https://github.com/apache/flink/pull/8112#issuecomment-479829421 Thanks for your review @hequn8128 @flinkbot approve all Merging ... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809678#comment-16809678 ] Dawid Wysakowicz commented on FLINK-3273: - [~fan_li_ya] It is a much bigger effort. Right now flink-streaming-java depends on flink-runtime (this was probably a bad design decision that we unfortunately have to leave with for now) which pulls scala. So we won't be able to make flink-streaming-java independent of scala as long as we don't invert that dependency, but this is a huge effort. > Remove Scala dependency from flink-streaming-java > - > > Key: FLINK-3273 > URL: https://issues.apache.org/jira/browse/FLINK-3273 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Maximilian Michels >Priority: Major > > {{flink-streaming-java}} depends on Scala through {{flink-clients}}, > {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the > Scala dependency just like we did for {{flink-java}}. Integration tests and > utilities which depend on Scala should be moved to {{flink-tests}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809672#comment-16809672 ] Liya Fan commented on FLINK-3273: - Hi [~mxm], thanks a lot for the detailed explanation. To resolve this issue, the scala-dependent stuffs in flink-streaming-java should be moved to flink-tests, so that flink-stream-java will be independent of Scala, right? If no one else is working on it, can I take it over? :) Thanks. > Remove Scala dependency from flink-streaming-java > - > > Key: FLINK-3273 > URL: https://issues.apache.org/jira/browse/FLINK-3273 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Maximilian Michels >Priority: Major > > {{flink-streaming-java}} depends on Scala through {{flink-clients}}, > {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the > Scala dependency just like we did for {{flink-java}}. Integration tests and > utilities which depend on Scala should be moved to {{flink-tests}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809641#comment-16809641 ] aitozi commented on FLINK-11912: Got it [~suez1224] . Thanks for your explanation, I think you are right. +1 for this change > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809634#comment-16809634 ] Maximilian Michels commented on FLINK-3273: --- The idea was to have a Scala-free version of {{flink-streaming-java}}, currently we have {{flink-streaming-java_2.11}} and {{flink-streaming-java_2.12}}. We would require a runtime-decoupled abstraction, similar to what {{flink-java}} has but for streaming. Then we could get rid of the Scala dependency. Not sure if there are plans at the moment to do that because it would be an invasive change. > Remove Scala dependency from flink-streaming-java > - > > Key: FLINK-3273 > URL: https://issues.apache.org/jira/browse/FLINK-3273 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Maximilian Michels >Priority: Major > > {{flink-streaming-java}} depends on Scala through {{flink-clients}}, > {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the > Scala dependency just like we did for {{flink-java}}. Integration tests and > utilities which depend on Scala should be moved to {{flink-tests}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272080191 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ## @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration Preconditions.checkArgument(totalJavaMemorySizeMB > 0); // subtract the Java memory used for network buffers (always off-heap) - final long networkBufMB = - calculateNetworkBufferMemory( - totalJavaMemorySizeMB << 20, // megabytes to bytes - config) >> 20; // bytes to megabytes + final long networkBufMB = NetworkEnvironmentConfiguration.calculateNetworkBufferMemory( Review comment: I think so, I created [FLINK-12110](https://issues.apache.org/jira/browse/FLINK-12110) to track it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272080191 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ## @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration Preconditions.checkArgument(totalJavaMemorySizeMB > 0); // subtract the Java memory used for network buffers (always off-heap) - final long networkBufMB = - calculateNetworkBufferMemory( - totalJavaMemorySizeMB << 20, // megabytes to bytes - config) >> 20; // bytes to megabytes + final long networkBufMB = NetworkEnvironmentConfiguration.calculateNetworkBufferMemory( Review comment: I think so, I created FLINK-12110 to track it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12110) Introduce API point for ShuffleService to get estimation of locally required memory
Andrey Zagrebin created FLINK-12110: --- Summary: Introduce API point for ShuffleService to get estimation of locally required memory Key: FLINK-12110 URL: https://issues.apache.org/jira/browse/FLINK-12110 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Andrey Zagrebin Assignee: zhijiang At the moment, we use static method TaskManagerServices.calculateHeapSizeMB to estimate local memory needed in TM container for network environment. As network environment becomes pluggable shuffle service, we should also make this request from container to shuffle service pluggable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] hequn8128 removed a comment on issue #8112: [hotfix][table]Improve column operations doc in tableAPI.
hequn8128 removed a comment on issue #8112: [hotfix][table]Improve column operations doc in tableAPI. URL: https://github.com/apache/flink/pull/8112#issuecomment-479810010 @sunjincheng121 Thanks for improving the document. It is clearer to separate the columns operation from the other ones. It's easier for users to find the difference between table and SQL when looking at the outline of the documents. +1 to merge. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on issue #8112: [hotfix][table]Improve column operations doc in tableAPI.
hequn8128 commented on issue #8112: [hotfix][table]Improve column operations doc in tableAPI. URL: https://github.com/apache/flink/pull/8112#issuecomment-479810010 @sunjincheng121 Thanks for improving the document. It is clearer to separate the columns operation from the other ones. It's easier for users to find the difference between table and SQL when looking at the outline of the documents. +1 to merge. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272077394 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); Review comment: As I understand `NetworkEnvironmentConfigurationBuilder.nettyConfig` is `null` by default. Then maybe `nettyConfig` could be some created default non-null object and `NetworkEnvironmentConfigurationBuilder` could also have `setDefaultNettyConfigAndIsCreditBased(boolean isCreditBased)` instead of `setIsCreditBased`? Internally it would construct default non-null `nettyConfig` with requested
[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272066212 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( Review comment: True, then we should probably use `@SuppressWarnings("deprecation")` in this case (now it should already go to `calculateNumberOfNetworkBuffers`) because the method `fromConfiguration` itself does not look deprecated . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8112: [hotfix][table]Improve column operations doc in tableAPI.
flinkbot commented on issue #8112: [hotfix][table]Improve column operations doc in tableAPI. URL: https://github.com/apache/flink/pull/8112#issuecomment-479800278 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12109) Flink SQL hangs in StreamTableEnvironment.sqlUpdate, finally lead to java.lang.StackOverflowError
xu created FLINK-12109: -- Summary: Flink SQL hangs in StreamTableEnvironment.sqlUpdate, finally lead to java.lang.StackOverflowError Key: FLINK-12109 URL: https://issues.apache.org/jira/browse/FLINK-12109 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.7.2 Reporter: xu Hi Experts, There is a Flink application(Version 1.7.2) which is written in Flink SQL, and the SQL in the application is quite long, consists of about 30 tables, 1500 lines in total. When executing, I found it is hanged in StreamTableEnvironment.sqlUpdate, keep executing some code about calcite, finally java.lang.StackOverflowError is got. The following is the stack trace: Caused by: java.lang.StackOverflowError at org.apache.calcite.rel.metadata.RelMetadataQuery.getDistinctRowCount(RelMetadataQuery.java:771) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:207) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:133) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdUtil.estimateFilteredRows(RelMdUtil.java:718) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:124) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:133) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdUtil.getJoinRowCount(RelMdUtil.java:674) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:188) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:133) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdRowCount.getRowCount(RelMdRowCount.java:72) at GeneratedMetadataHandler_RowCount.getRowCount_$(Unknown Source) at GeneratedMetadataHandler_RowCount.getRowCount(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getRowCount(RelMetadataQuery.java:225) at org.apache.calcite.rel.metadata.RelMdUtil.getJoinRowCount(RelMdUtil.java:674) at
[GitHub] [flink] sunjincheng121 opened a new pull request #8112: [hotfix][table]Improve column operations doc in tableAPI.
sunjincheng121 opened a new pull request #8112: [hotfix][table]Improve column operations doc in tableAPI. URL: https://github.com/apache/flink/pull/8112 ## What is the purpose of the change Move the document of AddColumns/AddOrReplaceColumns/DropColumns/RenameColumns to a separate menu, similar to Set Operations. ## Brief change log - Move the document of AddColumns/AddOrReplaceColumns/DropColumns/RenameColumns to a separate menu ## Verifying this change This change is only a document improvement. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272066212 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( Review comment: True, then we should probably use `@SuppressWarnings("deprecation")` in this case because the method `fromConfiguration` itself does not look deprecated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch
JingsongLi commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch URL: https://github.com/apache/flink/pull/8107#discussion_r272061016 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/SortMergeJoinOperator.java ## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.dataformat.GenericRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.generated.GeneratedJoinCondition; +import org.apache.flink.table.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.generated.GeneratedProjection; +import org.apache.flink.table.generated.GeneratedRecordComparator; +import org.apache.flink.table.generated.JoinCondition; +import org.apache.flink.table.generated.Projection; +import org.apache.flink.table.generated.RecordComparator; +import org.apache.flink.table.runtime.TableStreamOperator; +import org.apache.flink.table.runtime.sort.BinaryExternalSorter; +import org.apache.flink.table.runtime.util.ResettableExternalBuffer; +import org.apache.flink.table.runtime.util.StreamRecordCollector; +import org.apache.flink.table.typeutils.AbstractRowSerializer; +import org.apache.flink.table.typeutils.BinaryRowSerializer; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An implementation that realizes the joining through a sort-merge join strategy. + * 1.In most cases, its performance is weaker than HashJoin. + * 2.It is more stable than HashJoin, and most of the data can be sorted stably. + * 3.SortMergeJoin should be the best choice if sort can be omitted in the case of multi-level join + * cascade with the same key. + * + * NOTE: SEMI and ANTI join output input1 instead of input2. (Contrary to {@link HashJoinOperator}). + */ +public class SortMergeJoinOperator extends TableStreamOperator + implements TwoInputStreamOperator { + + private final long reservedSortMemory1; + private final long reservedSortMemory2; + private final long externalBufferMemory; + private final SortMergeJoinType type; + private final boolean leftIsSmaller; + private final boolean[] filterNulls; + + // generated code to cook + private GeneratedJoinCondition condFuncCode; + private GeneratedProjection projectionCode1; + private GeneratedProjection projectionCode2; + private GeneratedNormalizedKeyComputer computer1; + private GeneratedRecordComparator comparator1; + private GeneratedNormalizedKeyComputer computer2; + private GeneratedRecordComparator comparator2; + private GeneratedRecordComparator genKeyComparator; + + private transient MemoryManager memManager; + private transient IOManager ioManager; + private transient BinaryRowSerializer serializer1; + private transient BinaryRowSerializer serializer2; + private transient BinaryExternalSorter sorter1; + private transient BinaryExternalSorter sorter2; + private transient SortMergeJoinIterator joinIterator1; Review comment: yes... I can move them to local variable and use `try final` to ensure them closed. This is an automated message from the Apache
[GitHub] [flink] JingsongLi commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch
JingsongLi commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch URL: https://github.com/apache/flink/pull/8107#discussion_r272060274 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/SortMergeFullOuterJoinIterator.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.generated.Projection; +import org.apache.flink.table.generated.RecordComparator; +import org.apache.flink.table.runtime.util.ResettableExternalBuffer; +import org.apache.flink.table.typeutils.BinaryRowSerializer; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Gets two matched rows for full outer join. + */ +public class SortMergeFullOuterJoinIterator implements Closeable { Review comment: Yes, but because of the different concepts of names on both sides. (probeRow <=> row1), prefer keep them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272042362 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/MapTest.scala ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.{Func1, Func23, Func24} +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode} +import org.junit.Test + +class MapTest extends TableTestBase { Review comment: How about moving those test case into `CalcTest`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272048201 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/OperationTreeBuilder.scala ## @@ -369,6 +369,14 @@ class OperationTreeBuilder(private val tableEnv: TableEnvironment) { Union(left.asInstanceOf[LogicalNode], right.asInstanceOf[LogicalNode], all).validate(tableEnv) } + def map(mapFunction: Expression, child: TableOperation): TableOperation = { +val childNode = child.asInstanceOf[LogicalNode] +val expandedFields = expandProjectList( + Seq(mapFunction).map(expressionBridge.bridge) +.map(Flattening), childNode, tableEnv) Review comment: `.map(expressionBridge.bridge) .map(Flattening)` to `.map(e => Flattening(expressionBridge.bridge(e)))` I am not pretty which one is better? just a suggestion. :) What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272046074 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ## @@ -136,4 +136,21 @@ object ProjectionTranslator { case e: PlannerExpression => e } } + + + /** +* Extracts the leading non-alias expression. +* +* @param expr the expression to extract Review comment: Please remove spaces. i.e.: "the expression to extract" ->"the expression to extract" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272042200 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/MapValidationTest.scala ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg +import org.apache.flink.table.utils.{TableFunc0, TableTestBase} +import org.junit.Test + +class MapValidationTest extends TableTestBase { Review comment: How about moving those test case into `CalcValidationTest`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272038390 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/MapITCase.scala ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.{Func1, Func23, Func24, Func25} +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData} +import org.apache.flink.test.util.AbstractTestBase +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.{Ignore, Test} + +import scala.collection.mutable + +class MapITCase extends AbstractTestBase { + + @Test + def testSimpleMap(): Unit = { Review comment: I suggest reduce the ITCase count, so can merge `TestSimpleMap/testScalarResult/testMultiMap` in one case? And add the alias test as well. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272055289 ## File path: docs/dev/table/tableApi.md ## @@ -1682,6 +1682,34 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov {% top %} +### Map Review comment: How about adding a `Row-based operations` contains `map/flatmap/aggregate/flatAggregate`. ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272045373 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -462,6 +463,23 @@ class TableImpl( wrap(operationTreeBuilder.dropColumns(fields, operationTree)) } + override def map(mapFunction: String): Table = { +map(ExpressionParser.parseExpression(mapFunction)) + } + + override def map(mapFunction: Expression): Table = { +val resolvedMapFunction = callResolver.visit(mapFunction) +getLeadingNonAliasExpr(resolvedMapFunction) match { + case callExpr: CallExpression if callExpr.getFunctionDefinition.getType == +FunctionDefinition.Type.SCALAR_FUNCTION => + + case _ => +throw new ValidationException("Only ScalarFunction can be used in map.") Review comment: `Only ScalarFunction can be used in the map operator` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272041929 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/MapITCase.scala ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.{Func1, Func23, Func24, Func25} +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData} +import org.apache.flink.test.util.AbstractTestBase +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.{Ignore, Test} + +import scala.collection.mutable + +class MapITCase extends AbstractTestBase { Review comment: Can we move this test case into `CalcITCase`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272045455 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -462,6 +463,23 @@ class TableImpl( wrap(operationTreeBuilder.dropColumns(fields, operationTree)) } + override def map(mapFunction: String): Table = { +map(ExpressionParser.parseExpression(mapFunction)) + } + + override def map(mapFunction: Expression): Table = { +val resolvedMapFunction = callResolver.visit(mapFunction) +getLeadingNonAliasExpr(resolvedMapFunction) match { + case callExpr: CallExpression if callExpr.getFunctionDefinition.getType == +FunctionDefinition.Type.SCALAR_FUNCTION => + Review comment: It's better to remove the empty line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272044872 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -462,6 +463,23 @@ class TableImpl( wrap(operationTreeBuilder.dropColumns(fields, operationTree)) } + override def map(mapFunction: String): Table = { +map(ExpressionParser.parseExpression(mapFunction)) + } + + override def map(mapFunction: Expression): Table = { +val resolvedMapFunction = callResolver.visit(mapFunction) Review comment: How about using `mapFunction.accept(callResolver)`, just to use the callResolver in the TableImpl class is very uniform, and I am fine if you keep using `callResolver.visit(mapFunction)` in this PR. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272046322 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ## @@ -136,4 +136,21 @@ object ProjectionTranslator { case e: PlannerExpression => e } } + + + /** +* Extracts the leading non-alias expression. +* +* @param expr the expression to extract +* @return the top non-alias expression +*/ + def getLeadingNonAliasExpr(expr: Expression): Expression = { +expr match { + case callExpr: CallExpression if callExpr.getFunctionDefinition.equals( + BuiltInFunctionDefinitions.AS) => +getLeadingNonAliasExpr(callExpr.getChildren.get(0)) + Review comment: It's better to remove the empty line, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API
sunjincheng121 commented on a change in pull request #7167: [FLINK-10973] [table] Add support for map to table API URL: https://github.com/apache/flink/pull/7167#discussion_r272037699 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/MapITCase.scala ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.{Func1, Func23, Func24, Func25} +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData} +import org.apache.flink.test.util.AbstractTestBase +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.{Ignore, Test} + +import scala.collection.mutable + +class MapITCase extends AbstractTestBase { + + @Test + def testSimpleMap(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) Review comment: Please using `StreamTableEnvironment.create(env)` instead of `TableEnvironment.getTableEnvironment(env)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken commented on issue #7987: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null
lamber-ken commented on issue #7987: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null URL: https://github.com/apache/flink/pull/7987#issuecomment-479787896 @klion26,cc This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809581#comment-16809581 ] Liya Fan commented on FLINK-3273: - Hi [~dawidwys], thank you so much for your feedback. So what is the purpose of adding a Scala suffix? Does that action resolve this issue? > Remove Scala dependency from flink-streaming-java > - > > Key: FLINK-3273 > URL: https://issues.apache.org/jira/browse/FLINK-3273 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Maximilian Michels >Priority: Major > > {{flink-streaming-java}} depends on Scala through {{flink-clients}}, > {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the > Scala dependency just like we did for {{flink-java}}. Integration tests and > utilities which depend on Scala should be moved to {{flink-tests}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#discussion_r272034292 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import org.apache.flink.table.dataformat.BaseRow; + +import java.io.Serializable; + +/** + * comparing boundary values. + */ +public interface BoundComparator extends Serializable { + + /** +* reset the comparator. +*/ + void reset(); + + /** +* Compares its two row. Returns a negative integer, +* zero, or a positive integer as the first argument is less than, equal +* to, or greater than the second. +* +* @param inputRow the first row to be compared. +* @param inputIndex the index for the first row. +* @param currentRow the second row to be compared. +* @param currentIndex the index for the second row. +* @return a negative integer, zero, or a positive integer as the +* first argument is less than, equal to, or greater than the +* second. +*/ + long compare(BaseRow inputRow, int inputIndex, BaseRow currentRow, int currentIndex); Review comment: index is the number of current partition(window). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on issue #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#issuecomment-479776062 Thanks @KurtYoung for reviewing, I changed method name and add comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch
KurtYoung commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch URL: https://github.com/apache/flink/pull/8107#discussion_r272039981 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/SortMergeFullOuterJoinIterator.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.generated.Projection; +import org.apache.flink.table.generated.RecordComparator; +import org.apache.flink.table.runtime.util.ResettableExternalBuffer; +import org.apache.flink.table.typeutils.BinaryRowSerializer; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Gets two matched rows for full outer join. + */ +public class SortMergeFullOuterJoinIterator implements Closeable { Review comment: Can this also extends from `SortMergeJoinIterator`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch
KurtYoung commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch URL: https://github.com/apache/flink/pull/8107#discussion_r272038683 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/SortMergeJoinOperator.java ## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.dataformat.GenericRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.generated.GeneratedJoinCondition; +import org.apache.flink.table.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.generated.GeneratedProjection; +import org.apache.flink.table.generated.GeneratedRecordComparator; +import org.apache.flink.table.generated.JoinCondition; +import org.apache.flink.table.generated.Projection; +import org.apache.flink.table.generated.RecordComparator; +import org.apache.flink.table.runtime.TableStreamOperator; +import org.apache.flink.table.runtime.sort.BinaryExternalSorter; +import org.apache.flink.table.runtime.util.ResettableExternalBuffer; +import org.apache.flink.table.runtime.util.StreamRecordCollector; +import org.apache.flink.table.typeutils.AbstractRowSerializer; +import org.apache.flink.table.typeutils.BinaryRowSerializer; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An implementation that realizes the joining through a sort-merge join strategy. + * 1.In most cases, its performance is weaker than HashJoin. + * 2.It is more stable than HashJoin, and most of the data can be sorted stably. + * 3.SortMergeJoin should be the best choice if sort can be omitted in the case of multi-level join + * cascade with the same key. + * + * NOTE: SEMI and ANTI join output input1 instead of input2. (Contrary to {@link HashJoinOperator}). + */ +public class SortMergeJoinOperator extends TableStreamOperator + implements TwoInputStreamOperator { + + private final long reservedSortMemory1; + private final long reservedSortMemory2; + private final long externalBufferMemory; + private final SortMergeJoinType type; + private final boolean leftIsSmaller; + private final boolean[] filterNulls; + + // generated code to cook + private GeneratedJoinCondition condFuncCode; + private GeneratedProjection projectionCode1; + private GeneratedProjection projectionCode2; + private GeneratedNormalizedKeyComputer computer1; + private GeneratedRecordComparator comparator1; + private GeneratedNormalizedKeyComputer computer2; + private GeneratedRecordComparator comparator2; + private GeneratedRecordComparator genKeyComparator; + + private transient MemoryManager memManager; + private transient IOManager ioManager; + private transient BinaryRowSerializer serializer1; + private transient BinaryRowSerializer serializer2; + private transient BinaryExternalSorter sorter1; + private transient BinaryExternalSorter sorter2; + private transient SortMergeJoinIterator joinIterator1; Review comment: you cannot list all variables you *might* used, and choose which one to use in different scenarios... This is an automated message
[GitHub] [flink] KurtYoung commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch
KurtYoung commented on a change in pull request #8107: [FLINK-12094][table-runtime-blink] Introduce sort merge join operator to blink batch URL: https://github.com/apache/flink/pull/8107#discussion_r272036764 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/SortMergeJoinHelper.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.generated.JoinCondition; +import org.apache.flink.table.runtime.util.ResettableExternalBuffer; +import org.apache.flink.util.Collector; + +import java.util.BitSet; + +/** + * Helper class for sort merge join operators. Contains some join methods. + */ +public class SortMergeJoinHelper { Review comment: I think we don't need this class, it's just a mix of various join function, you can place them directly into SMJ operator This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12108) Simplify splitting expressions into projections, aggregations & windowProperties
Dawid Wysakowicz created FLINK-12108: Summary: Simplify splitting expressions into projections, aggregations & windowProperties Key: FLINK-12108 URL: https://issues.apache.org/jira/browse/FLINK-12108 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#discussion_r272038501 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate.over.frame; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.runtime.context.ExecutionContext; +import org.apache.flink.table.runtime.util.ResettableExternalBuffer; + +import java.io.Serializable; + +/** + * A window frame calculates the results for those records belong to a window frame. + * Before use a frame must be prepared by passing it all the records in the current partition. + */ +public interface OverWindowFrame extends Serializable { + + void open(ExecutionContext ctx) throws Exception; + + void resetBuffer(ResettableExternalBuffer rows) throws Exception; + + //return the ACC of the window frame. + BaseRow write(int index, BaseRow current) throws Exception; Review comment: Method `write` need modify to `process`. Over AGG means that every Row has a corresponding output. OverWindowFrame is called by: 1. Get all data and invoke `prepare(ResettableExternalBuffer rows)` for partition 2. Then each Row is traversed one by one to invoke `BaseRow process(int index, BaseRow current)`to get the calculation results of the currentRow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#discussion_r272038501 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate.over.frame; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.runtime.context.ExecutionContext; +import org.apache.flink.table.runtime.util.ResettableExternalBuffer; + +import java.io.Serializable; + +/** + * A window frame calculates the results for those records belong to a window frame. + * Before use a frame must be prepared by passing it all the records in the current partition. + */ +public interface OverWindowFrame extends Serializable { + + void open(ExecutionContext ctx) throws Exception; + + void resetBuffer(ResettableExternalBuffer rows) throws Exception; + + //return the ACC of the window frame. + BaseRow write(int index, BaseRow current) throws Exception; Review comment: Method `write` need modify to `process`. Over AGG means that every Row has a corresponding output. OverWindowFrame is called by: 1. Get all data and invoke `prepare(Resettable External Buffer rows)` for partition 2. Then each Row is traversed one by one to invoke `BaseRow process(int index, BaseRow current)`to get the calculation results of the currentRow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-3273) Remove Scala dependency from flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809565#comment-16809565 ] Dawid Wysakowicz edited comment on FLINK-3273 at 4/4/19 6:54 AM: - [~fan_li_ya] "Adding a Scala suffix" means that the full artifactId is {{flink-streaming-java_\{scalaversion-suffix\}}}. Rather than just {{flink-streaming-java}}. was (Author: dawidwys): [~fan_li_ya] "Adding a Scala suffix" means that the full artifactId is {{flink-streaming-java_{scala-version-suffix}}}. Rather than just {{flink-streaming-java}}. > Remove Scala dependency from flink-streaming-java > - > > Key: FLINK-3273 > URL: https://issues.apache.org/jira/browse/FLINK-3273 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Maximilian Michels >Priority: Major > > {{flink-streaming-java}} depends on Scala through {{flink-clients}}, > {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the > Scala dependency just like we did for {{flink-java}}. Integration tests and > utilities which depend on Scala should be moved to {{flink-tests}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3273) Remove Scala dependency from flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-3273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809565#comment-16809565 ] Dawid Wysakowicz commented on FLINK-3273: - [~fan_li_ya] "Adding a Scala suffix" means that the full artifactId is {{flink-streaming-java_{scala-version-suffix}}}. Rather than just {{flink-streaming-java}}. > Remove Scala dependency from flink-streaming-java > - > > Key: FLINK-3273 > URL: https://issues.apache.org/jira/browse/FLINK-3273 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Maximilian Michels >Priority: Major > > {{flink-streaming-java}} depends on Scala through {{flink-clients}}, > {{flink-runtime}}, and {{flink-testing-utils}}. We should get rid of the > Scala dependency just like we did for {{flink-java}}. Integration tests and > utilities which depend on Scala should be moved to {{flink-tests}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12105) TUMBLE INTERVAL value errors out for 100 or more value
[ https://issues.apache.org/jira/browse/FLINK-12105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809561#comment-16809561 ] Dawid Wysakowicz commented on FLINK-12105: -- I think the design doc for upcoming refactor of Types system in Table API might come in handy to understand different types: [https://docs.google.com/document/d/1a9HUb6OaBIoj9IRfbILcMFPrOL7ALeZ3rVI66dvA2_U] It is also true that as of now flink supports only intervals with nanosecond precision for window aggregations. > TUMBLE INTERVAL value errors out for 100 or more value > -- > > Key: FLINK-12105 > URL: https://issues.apache.org/jira/browse/FLINK-12105 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.7.2 > Environment: [https://github.com/ververica/sql-training] >Reporter: Vinod Mehra >Priority: Major > > I ran into this while experimenting with different values at Lyft eng. > However it is reproducible with [https://github.com/ververica/sql-training] > as well. I showed this issue to the training instructors during > flink-forward-19 and they asked me to file this bug. > The INTERVAL values work fine until 99. Errors after that: > *TUMBLE(rideTime, INTERVAL '100' SECOND)* > _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value > 100 exceeds precision of SECOND(2) field_ > *TUMBLE(rideTime, INTERVAL '100' MINUTE)* > _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value > 100 exceeds precision of MINUTE(2) field_ > *TUMBLE(rideTime, INTERVAL '100' HOUR)* > _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value > 100 exceeds precision of HOUR(2) field_ > *TUMBLE(rideTime, INTERVAL '100' DAY)* > _org.apache.calcite.sql.validate.SqlValidatorException:_ Interval field value > 100 exceeds precision of DAY(2) field > (Note: MONTH AND YEAR also error out but for different reasons ("_Only > constant window intervals with millisecond resolution are supported_"). MONTH > and YEAR intervals are not supported at all currently. I was told that it is > hard to implement because of timezone differences. I will file that > separately.)_ > _ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#discussion_r272036049 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate.over.frame; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.runtime.context.ExecutionContext; +import org.apache.flink.table.runtime.util.ResettableExternalBuffer; + +import java.io.Serializable; + +/** + * A window frame calculates the results for those records belong to a window frame. + * Before use a frame must be prepared by passing it all the records in the current partition. + */ +public interface OverWindowFrame extends Serializable { + + void open(ExecutionContext ctx) throws Exception; + + void resetBuffer(ResettableExternalBuffer rows) throws Exception; Review comment: Means prepare for next partition(window). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#discussion_r272035934 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate.over.frame; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.runtime.context.ExecutionContext; +import org.apache.flink.table.runtime.util.ResettableExternalBuffer; + +import java.io.Serializable; + +/** + * A window frame calculates the results for those records belong to a window frame. + * Before use a frame must be prepared by passing it all the records in the current partition. + */ +public interface OverWindowFrame extends Serializable { + + void open(ExecutionContext ctx) throws Exception; + + void resetBuffer(ResettableExternalBuffer rows) throws Exception; Review comment: Naming `prepare` is more appropriate This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#discussion_r272036049 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/aggregate/over/frame/OverWindowFrame.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate.over.frame; + +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.runtime.context.ExecutionContext; +import org.apache.flink.table.runtime.util.ResettableExternalBuffer; + +import java.io.Serializable; + +/** + * A window frame calculates the results for those records belong to a window frame. + * Before use a frame must be prepared by passing it all the records in the current partition. + */ +public interface OverWindowFrame extends Serializable { + + void open(ExecutionContext ctx) throws Exception; + + void resetBuffer(ResettableExternalBuffer rows) throws Exception; Review comment: Means prepare for next partition. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#discussion_r272034292 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import org.apache.flink.table.dataformat.BaseRow; + +import java.io.Serializable; + +/** + * comparing boundary values. + */ +public interface BoundComparator extends Serializable { + + /** +* reset the comparator. +*/ + void reset(); + + /** +* Compares its two row. Returns a negative integer, +* zero, or a positive integer as the first argument is less than, equal +* to, or greater than the second. +* +* @param inputRow the first row to be compared. +* @param inputIndex the index for the first row. +* @param currentRow the second row to be compared. +* @param currentIndex the index for the second row. +* @return a negative integer, zero, or a positive integer as the +* first argument is less than, equal to, or greater than the +* second. +*/ + long compare(BaseRow inputRow, int inputIndex, BaseRow currentRow, int currentIndex); Review comment: index is the number of current window. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#discussion_r272033859 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import org.apache.flink.table.dataformat.BaseRow; + +import java.io.Serializable; + +/** + * comparing boundary values. Review comment: https://docs.oracle.com/cd/E17952_01/mysql-8.0-en/window-functions-frames.html Above url has explanation, e.g.: CURRENT ROW: For ROWS, the bound is the current row. For RANGE, the bound is the peers of the current row. UNBOUNDED PRECEDING: The bound is the first partition row. UNBOUNDED FOLLOWING: The bound is the last partition row. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#discussion_r272033041 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import org.apache.flink.table.dataformat.BaseRow; + +import java.io.Serializable; + +/** + * comparing boundary values. + */ +public interface BoundComparator extends Serializable { + + /** +* reset the comparator. +*/ + void reset(); Review comment: Because the field getter of currentRow may be cached. For example, the `UnboundedFollowingOverWindowFrame`, when calculating its tail boundary, may cross multiple Rows in iterator. The `BoundComparator.compare` will be invoked many times is the same `currentRow` input, so it is not necessary to get its field value every time, so cached its field value in `BoundComparator`. `reset` is designed to clear the cache of `BoundComparator`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12105) TUMBLE INTERVAL value errors out for 100 or more value
[ https://issues.apache.org/jira/browse/FLINK-12105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16809550#comment-16809550 ] Hequn Cheng commented on FLINK-12105: - [~vmehra] Hi, thanks for bringing up the discussion. As the default interval precision is 2, you need to specify the precision, e.g., {{INTERVAL '100' HOUR(3)}}. For MONTH and YEAR, there is already a discussion for it which may be helpful for you, see FLINK-9740. Best, Hequn > TUMBLE INTERVAL value errors out for 100 or more value > -- > > Key: FLINK-12105 > URL: https://issues.apache.org/jira/browse/FLINK-12105 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.7.2 > Environment: [https://github.com/ververica/sql-training] >Reporter: Vinod Mehra >Priority: Major > > I ran into this while experimenting with different values at Lyft eng. > However it is reproducible with [https://github.com/ververica/sql-training] > as well. I showed this issue to the training instructors during > flink-forward-19 and they asked me to file this bug. > The INTERVAL values work fine until 99. Errors after that: > *TUMBLE(rideTime, INTERVAL '100' SECOND)* > _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value > 100 exceeds precision of SECOND(2) field_ > *TUMBLE(rideTime, INTERVAL '100' MINUTE)* > _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value > 100 exceeds precision of MINUTE(2) field_ > *TUMBLE(rideTime, INTERVAL '100' HOUR)* > _org.apache.calcite.sql.validate.SqlValidatorException: Interval field value > 100 exceeds precision of HOUR(2) field_ > *TUMBLE(rideTime, INTERVAL '100' DAY)* > _org.apache.calcite.sql.validate.SqlValidatorException:_ Interval field value > 100 exceeds precision of DAY(2) field > (Note: MONTH AND YEAR also error out but for different reasons ("_Only > constant window intervals with millisecond resolution are supported_"). MONTH > and YEAR intervals are not supported at all currently. I was told that it is > hard to implement because of timezone differences. I will file that > separately.)_ > _ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dawidwys commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
dawidwys commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces URL: https://github.com/apache/flink/pull/8050#discussion_r272031691 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.ExternalCatalog; +import org.apache.flink.table.descriptors.ConnectorDescriptor; +import org.apache.flink.table.descriptors.TableDescriptor; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; + +/** + * The base class for batch and stream TableEnvironments. + * + * The TableEnvironment is a central concept of the Table API and SQL integration. It is + * responsible for: + * + * + * Registering a Table in the internal catalog + * Registering an external catalog + * Executing SQL queries + * Registering a user-defined (scalar, table, or aggregation) function + * Converting a DataStream or DataSet into a Table + * Holding a reference to an ExecutionEnvironment or StreamExecutionEnvironment + * + */ +@PublicEvolving +public interface TableEnvironment { + + /** +* Creates a table from a table source. +* +* @param source table source used as table +*/ + Table fromTableSource(TableSource source); + + /** +* Registers an {@link ExternalCatalog} under a unique name in the TableEnvironment's schema. +* All tables registered in the {@link ExternalCatalog} can be accessed. +* +* @param nameThe name under which the externalCatalog will be registered +* @param externalCatalog The externalCatalog to register +*/ + void registerExternalCatalog(String name, ExternalCatalog externalCatalog); + + /** +* Gets a registered {@link ExternalCatalog} by name. +* +* @param name The name to look up the {@link ExternalCatalog} +* @return The {@link ExternalCatalog} +*/ + ExternalCatalog getRegisteredExternalCatalog(String name); + + /** +* Registers a {@link ScalarFunction} under a unique name. Replaces already existing +* user-defined functions under this name. +*/ + void registerFunction(String name, ScalarFunction function); + + /** +* Registers a {@link Table} under a unique name in the TableEnvironment's catalog. +* Registered tables can be referenced in SQL queries. +* +* @param name The name under which the table will be registered. +* @param table The table to register. +*/ + void registerTable(String name, Table table); + + /** +* Registers an external {@link TableSource} in this {@link TableEnvironment}'s catalog. +* Registered tables can be referenced in SQL queries. +* +* @param nameThe name under which the {@link TableSource} is registered. +* @param tableSource The {@link TableSource} to register. +*/ + void registerTableSource(String name, TableSource tableSource); + + /** +* Registers an external {@link TableSink} with given field names and types in this +* {@link TableEnvironment}'s catalog. +* Registered sink tables can be referenced in SQL DML statements. +* +* @param name The name under which the {@link TableSink} is registered. +* @param fieldNames The field names to register with the {@link TableSink}. +* @param fieldTypes The field types to register with the {@link TableSink}. +* @param tableSink The {@link TableSink} to register. +*/ + void registerTableSink(String name, String[] fieldNames, TypeInformation[] fieldTypes, TableSink tableSink); Review comment: That's unfortunate as we are extracting new interface, but I
[GitHub] [flink] JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch
JingsongLi commented on a change in pull request #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch URL: https://github.com/apache/flink/pull/8102#discussion_r272031198 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/BoundComparator.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import org.apache.flink.table.dataformat.BaseRow; + +import java.io.Serializable; + +/** + * comparing boundary values. + */ +public interface BoundComparator extends Serializable { Review comment: For example, `SELECT d, e, sum(e) over (partition by d order by e desc range between 5 PRECEDING and 4 PRECEDING) FROM Table` The above SQL is ranged according to the `e` field. The previous 5 PRECEDING means that the `e` value of current Row should be greater than or equal to the `e` value of range header plus 5. So the `BoundComparator` is used to code generate this logic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services