[jira] [Comment Edited] (FLINK-11153) Remove UdfAnalyzer

2019-03-22 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799522#comment-16799522
 ] 

vinoyang edited comment on FLINK-11153 at 3/23/19 4:34 AM:
---

+1, I agree [~twalthr] 's idea. I have just had a look at the classes in sca 
package, most of those classes are not used outside of the sca package. So we 
can remove all the whole sca package. cc [~Zentol] and [~gjy]

update:

Just found two use cases in {{UdfOperatorUtils#analyzeSingleInputUdf}} and 
{{UdfOperatorUtils#analyzeDualInputUdf}}

 Considering there are many classes (include dependency and test classes in 
different places), I suggest that we can create an umbrella issue to do the 
cleanup work. I can help to do this work. What do you think?


was (Author: yanghua):
+1, I agree [~twalthr] 's idea. I have just had a look at the classes in sca 
package, most of those classes are not used outside of the sca package. So we 
can remove all the whole sca package. cc [~Zentol] and [~gjy]

update:

Just found two use cases in {{UdfOperatorUtils#analyzeSingleInputUdf}} and 
{{UdfOperatorUtils#analyzeDualInputUdf}}

 

> Remove UdfAnalyzer
> --
>
> Key: FLINK-11153
> URL: https://issues.apache.org/jira/browse/FLINK-11153
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Tests
>Affects Versions: 1.8.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Major
>
> {noformat}
> org.apache.flink.api.java.sca.CodeAnalyzerException: Exception occurred 
> during code analysis.
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzer.analyze(UdfAnalyzer.java:341)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInputWithKeys(UdfAnalyzerTest.java:1339)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInput(UdfAnalyzerTest.java:1322)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.testForwardWithArrayModification(UdfAnalyzerTest.java:695)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:564)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode(UdfAnalyzerUtils.java:131)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode(UdfAnalyzerUtils.java:115)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzer.analyze(UdfAnalyzer.java:290)
>   ... 25 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11153) Remove UdfAnalyzer

2019-03-22 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799522#comment-16799522
 ] 

vinoyang edited comment on FLINK-11153 at 3/23/19 4:27 AM:
---

+1, I agree [~twalthr] 's idea. I have just had a look at the classes in sca 
package, most of those classes are not used outside of the sca package. So we 
can remove all the whole sca package. cc [~Zentol] and [~gjy]

update:

Just found two use cases in {{UdfOperatorUtils#analyzeSingleInputUdf}} and 
{{UdfOperatorUtils#analyzeDualInputUdf}}

 


was (Author: yanghua):
+1, I agree [~twalthr] 's idea. I have just had a look at the classes in sca 
package, those classes are not used outside of the sca package. So we can 
remove all the whole sca package. cc [~Zentol] and [~gjy]

> Remove UdfAnalyzer
> --
>
> Key: FLINK-11153
> URL: https://issues.apache.org/jira/browse/FLINK-11153
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Tests
>Affects Versions: 1.8.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Major
>
> {noformat}
> org.apache.flink.api.java.sca.CodeAnalyzerException: Exception occurred 
> during code analysis.
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzer.analyze(UdfAnalyzer.java:341)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInputWithKeys(UdfAnalyzerTest.java:1339)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInput(UdfAnalyzerTest.java:1322)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.testForwardWithArrayModification(UdfAnalyzerTest.java:695)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:564)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode(UdfAnalyzerUtils.java:131)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode(UdfAnalyzerUtils.java:115)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzer.analyze(UdfAnalyzer.java:290)
>   ... 25 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11153) Remove UdfAnalyzer

2019-03-22 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799522#comment-16799522
 ] 

vinoyang commented on FLINK-11153:
--

+1, I agree [~twalthr] 's idea. I have just had a look at the classes in sca 
package, those classes are not used outside of the sca package. So we can 
remove all the whole sca package. cc [~Zentol] and [~gjy]

> Remove UdfAnalyzer
> --
>
> Key: FLINK-11153
> URL: https://issues.apache.org/jira/browse/FLINK-11153
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Tests
>Affects Versions: 1.8.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Major
>
> {noformat}
> org.apache.flink.api.java.sca.CodeAnalyzerException: Exception occurred 
> during code analysis.
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzer.analyze(UdfAnalyzer.java:341)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInputWithKeys(UdfAnalyzerTest.java:1339)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInput(UdfAnalyzerTest.java:1322)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.testForwardWithArrayModification(UdfAnalyzerTest.java:695)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:564)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode(UdfAnalyzerUtils.java:131)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode(UdfAnalyzerUtils.java:115)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzer.analyze(UdfAnalyzer.java:290)
>   ... 25 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11153) Remove UdfAnalyzer

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-11153:


Assignee: vinoyang

> Remove UdfAnalyzer
> --
>
> Key: FLINK-11153
> URL: https://issues.apache.org/jira/browse/FLINK-11153
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Tests
>Affects Versions: 1.8.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Major
>
> {noformat}
> org.apache.flink.api.java.sca.CodeAnalyzerException: Exception occurred 
> during code analysis.
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzer.analyze(UdfAnalyzer.java:341)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInputWithKeys(UdfAnalyzerTest.java:1339)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInput(UdfAnalyzerTest.java:1322)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.testForwardWithArrayModification(UdfAnalyzerTest.java:695)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:564)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode(UdfAnalyzerUtils.java:131)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode(UdfAnalyzerUtils.java:115)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzer.analyze(UdfAnalyzer.java:290)
>   ... 25 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11996) Case class maximum of 22 fields

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-11996:


Assignee: vinoyang

> Case class maximum of 22 fields
> ---
>
> Key: FLINK-11996
> URL: https://issues.apache.org/jira/browse/FLINK-11996
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Scala, API / Type Serialization System, 
> Documentation
>Reporter: Wouter Zorgdrager
>Assignee: vinoyang
>Priority: Minor
>
> The [serialization 
> documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#flinks-typeinformation-class]
>  states that there is a limit of 22 fields in a case class. Since [Scala 
> 2.11|https://github.com/scala/bug/issues/7296] this arity limit has been 
> removed and therefore this limit should also be removed on this documentation 
> page. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11982) BatchTableSourceFactory support Json Format File

2019-03-22 Thread frank wang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799515#comment-16799515
 ] 

frank wang commented on FLINK-11982:


you want to dev this function? you don't provide that, 

> BatchTableSourceFactory support Json Format File
> 
>
> Key: FLINK-11982
> URL: https://issues.apache.org/jira/browse/FLINK-11982
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.6.4, 1.7.2
>Reporter: pingle wang
>Assignee: frank wang
>Priority: Major
>
> java code :
> {code:java}
> val connector = FileSystem().path("data/in/test.json")
> val desc = tEnv.connect(connector)
> .withFormat(
> new Json()
> .schema(
> Types.ROW(
> Array[String]("id", "name", "age"),
> Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.INT))
> )
> .failOnMissingField(true)
> ).registerTableSource("persion")
> val sql = "select * from person"
> val result = tEnv.sqlQuery(sql)
> {code}
> Exception info :
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
> suitable table factory for 
> 'org.apache.flink.table.factories.BatchTableSourceFactory' in
> the classpath.
> Reason: No context matches.
> The following properties are requested:
> connector.path=file:///Users/batch/test.json
> connector.property-version=1
> connector.type=filesystem
> format.derive-schema=true
> format.fail-on-missing-field=true
> format.property-version=1
> format.type=json
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.formats.avro.AvroRowFormatFactory
> org.apache.flink.formats.json.JsonRowFormatFactory
> org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
> at 
> org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
> at 
> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
> at 
> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
> at 
> org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:44)
> at 
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
> at com.meitu.mlink.sql.batch.JsonExample.main(JsonExample.java:36){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8039: Release 1.8

2019-03-22 Thread GitBox
flinkbot commented on issue #8039: Release 1.8
URL: https://github.com/apache/flink/pull/8039#issuecomment-475829632
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] syijie51 opened a new pull request #8039: Release 1.8

2019-03-22 Thread GitBox
syijie51 opened a new pull request #8039: Release 1.8
URL: https://github.com/apache/flink/pull/8039
 
 
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-22 Thread GitBox
TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] 
Port the MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#discussion_r268377279
 
 

 ##
 File path: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ##
 @@ -1846,7 +1846,7 @@ object TaskManager {
 val metricRegistry = new MetricRegistryImpl(
   MetricRegistryConfiguration.fromConfiguration(configuration))
 
-metricRegistry.startQueryService(taskManagerSystem, resourceID)
+//metricRegistry.startQueryService(taskManagerSystem, resourceID)
 
 Review comment:
   removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-22 Thread GitBox
TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] 
Port the MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#discussion_r268377229
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -217,7 +205,7 @@ private void logDumpSizeWouldExceedLimit(final String 
metricType, boolean hasExc
 * {@code space : . ,} are replaced by {@code _} 
(underscore)
 * 
 */
-   static String replaceInvalidChars(String str) {
+   private static String replaceInvalidChars(String str) {
 
 Review comment:
   reverted


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-22 Thread GitBox
TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] 
Port the MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#discussion_r268377188
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -27,31 +28,28 @@
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpSerializer;
 
 /**
  * The MetricQueryService creates a key-value representation of all metrics 
currently registered with Flink when queried.
  *
  * It is realized as an actor and can be notified of
- * - an added metric by calling {@link 
MetricQueryService#notifyOfAddedMetric(ActorRef, Metric, String, 
AbstractMetricGroup)}
- * - a removed metric by calling {@link 
MetricQueryService#notifyOfRemovedMetric(ActorRef, Metric)}
- * - a metric dump request by sending the return value of {@link 
MetricQueryService#getCreateDump()}
+ * - an added metric by calling {@link #addMetric(String, Metric, 
AbstractMetricGroup)}
+ * - a removed metric by calling {@link #removeMetric(Metric)}
+ * - a metric dump request by sending the return value of {@link 
#queryMetrics(Time)}
 
 Review comment:
   updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on issue #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-22 Thread GitBox
TisonKun commented on issue #7927: [FLINK-11603][metrics] Port the 
MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#issuecomment-475829204
 
 
   Thanks for your review @zentol ! React to comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-22 Thread GitBox
TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] 
Port the MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#discussion_r268377110
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 ##
 @@ -100,11 +100,11 @@ public static RpcService createRpcService(
int port,
Configuration configuration) throws Exception {
final ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, hostname, port, LOG);
-   return instantiateAkkaRpcService(configuration, actorSystem);
+   return createRpcService(configuration, actorSystem);
}
 
@Nonnull
-   private static RpcService instantiateAkkaRpcService(Configuration 
configuration, ActorSystem actorSystem) {
+   public static RpcService createRpcService(Configuration configuration, 
ActorSystem actorSystem) {
 
 Review comment:
   Introduce a method `#createRpcService(String, String, Configuration, String, 
BootstrapTools.ActorSystemExecutorConfiguration)` instead of make 
`#instantiateAkkaRpcService(Configuration, ActorSystem) ` public


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-22 Thread GitBox
TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] 
Port the MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#discussion_r268375981
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -73,60 +71,50 @@ public String filterCharacters(String input) {
 
private final long messageSizeLimit;
 
-   public MetricQueryService(long messageSizeLimit) {
+   public MetricQueryService(RpcService rpcService, String endpointId, 
long messageSizeLimit) {
+   super(rpcService, endpointId);
this.messageSizeLimit = messageSizeLimit;
}
 
@Override
-   public void postStop() {
+   public CompletableFuture onStop() {
serializer.close();
+   return CompletableFuture.completedFuture(null);
}
 
-   @Override
-   public void onReceive(Object message) {
-   try {
 
 Review comment:
   All of `addMetric` `removeMetric` and `queryMetrics` don't throw Exception. 
If it is a fatal, the rpcendpoint will down. Any exception thrown from the 
methods should be the same as a exception throw from 
`AkkaRpcActor#handleRpcInvocation`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-22 Thread GitBox
TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] 
Port the MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#discussion_r268375149
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -73,60 +71,50 @@ public String filterCharacters(String input) {
 
private final long messageSizeLimit;
 
-   public MetricQueryService(long messageSizeLimit) {
+   public MetricQueryService(RpcService rpcService, String endpointId, 
long messageSizeLimit) {
+   super(rpcService, endpointId);
this.messageSizeLimit = messageSizeLimit;
}
 
@Override
-   public void postStop() {
+   public CompletableFuture onStop() {
serializer.close();
+   return CompletableFuture.completedFuture(null);
}
 
-   @Override
-   public void onReceive(Object message) {
-   try {
-   if (message instanceof AddMetric) {
-   AddMetric added = (AddMetric) message;
-
-   String metricName = added.metricName;
-   Metric metric = added.metric;
-   AbstractMetricGroup group = added.group;
-
-   QueryScopeInfo info = 
group.getQueryServiceMetricInfo(FILTER);
-
-   if (metric instanceof Counter) {
-   counters.put((Counter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   } else if (metric instanceof Gauge) {
-   gauges.put((Gauge) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   } else if (metric instanceof Histogram) {
-   histograms.put((Histogram) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   } else if (metric instanceof Meter) {
-   meters.put((Meter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   }
-   } else if (message instanceof RemoveMetric) {
-   Metric metric = (((RemoveMetric) 
message).metric);
-   if (metric instanceof Counter) {
-   this.counters.remove(metric);
-   } else if (metric instanceof Gauge) {
-   this.gauges.remove(metric);
-   } else if (metric instanceof Histogram) {
-   this.histograms.remove(metric);
-   } else if (metric instanceof Meter) {
-   this.meters.remove(metric);
-   }
-   } else if (message instanceof CreateDump) {
-   
MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-
-   dump = enforceSizeLimit(dump);
-
-   getSender().tell(dump, getSelf());
-   } else {
-   LOG.warn("MetricQueryServiceActor received an 
invalid message. " + message.toString());
-   getSender().tell(new Status.Failure(new 
IOException("MetricQueryServiceActor received an invalid message. " + 
message.toString())), getSelf());
+   public void addMetric(String metricName, Metric metric, 
AbstractMetricGroup group) {
+   runAsync(() -> {
 
 Review comment:
   You're right that since add actual calls to `addMetric` are `RpcInvocation`, 
this method should be already always running in the main thread executor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-22 Thread GitBox
TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] 
Port the MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#discussion_r268373838
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
 ##
 @@ -269,13 +270,13 @@ public Builder 
setRequestClusterOverviewSupplier(Supplier>>
 requestMetricQueryServicePathsSupplier) {
-   this.requestMetricQueryServicePathsSupplier = 
requestMetricQueryServicePathsSupplier;
+   public Builder 
setRequestMetricQueryServiceGatewaysSupplier(Supplier>>
 requestMetricQueryServiceGatewaysSupplier) {
+   this.requestMetricQueryServiceGatewaysSupplier = 
requestMetricQueryServiceGatewaysSupplier;
return this;
}
 
-   public Builder 
setRequestTaskManagerMetricQueryServicePathsSupplier(Supplier>>> requestTaskManagerMetricQueryServicePathsSupplier) {
-   this.requestTaskManagerMetricQueryServicePathsSupplier 
= requestTaskManagerMetricQueryServicePathsSupplier;
+   public Builder 
setRequestTaskManagerMetricQueryServiceGatewaysSupplier(Supplier>>> 
requestTaskManagerMetricQueryServiceGatewaysSupplier) {
 
 Review comment:
   Given the return value is `Collection>` and means return the gateways of TaskManagers' 
MQS, I'd like to keep "s". However, we have outdate document of 
`requestTaskManagerMetricQueryServiceGateways` that should be updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-22 Thread GitBox
TisonKun commented on a change in pull request #7927: [FLINK-11603][metrics] 
Port the MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#discussion_r268373575
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
 ##
 @@ -369,21 +367,17 @@ public void testQueryActorShutdown() throws Exception {
 
MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
-   final ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+   final RpcService rpcService = new TestingRpcService();
 
-   registry.startQueryService(actorSystem, null);
+   registry.startQueryService(rpcService, null);
 
-   ActorRef queryServiceActor = registry.getQueryService();
+   MetricQueryService queryService = 
checkNotNull(registry.getQueryService());
 
registry.shutdown().get();
 
-   try {
-   
Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout),
 timeout);
+   queryService.getTerminationFuture().get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
 
-   fail("The query actor should be terminated resulting in 
a ActorNotFound exception.");
-   } catch (ActorNotFound e) {
-   // we expect the query actor to be shut down
-   }
+   rpcService.stopService();
 
 Review comment:
   You're right that the registry supposed to shut it down. Remove this line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11996) Case class maximum of 22 fields

2019-03-22 Thread william hesch (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799378#comment-16799378
 ] 

william hesch commented on FLINK-11996:
---

+1

> Case class maximum of 22 fields
> ---
>
> Key: FLINK-11996
> URL: https://issues.apache.org/jira/browse/FLINK-11996
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Scala, API / Type Serialization System, 
> Documentation
>Reporter: Wouter Zorgdrager
>Priority: Minor
>
> The [serialization 
> documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#flinks-typeinformation-class]
>  states that there is a limit of 22 fields in a case class. Since [Scala 
> 2.11|https://github.com/scala/bug/issues/7296] this arity limit has been 
> removed and therefore this limit should also be removed on this documentation 
> page. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10897) Support POJO state schema evolution

2019-03-22 Thread william hesch (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799348#comment-16799348
 ] 

william hesch commented on FLINK-10897:
---

[~tzulitai]

> Support POJO state schema evolution
> ---
>
> Key: FLINK-10897
> URL: https://issues.apache.org/jira/browse/FLINK-10897
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.8.0
>
>
> Main action point for this is to implement a separate POJO serializer that is 
> specifically used as the restore serializer.
> This restore POJO serializer should be able to read and dump values of fields 
> that no longer exists in the updated POJO schema, and assign default values 
> to newly added fields. Snapshot of the {{PojoSerializer}} should contain 
> sufficient information so that on restore, the information can be compared 
> with the adapted POJO class to figure out which fields have been removed / 
> added.
> Changing fields types is out of scope and should not be supported.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11889) Remove "stop" signal along with Stoppable interfaces

2019-03-22 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-11889:
--

Assignee: Kostas Kloudas

> Remove "stop" signal along with Stoppable interfaces
> 
>
> Key: FLINK-11889
> URL: https://issues.apache.org/jira/browse/FLINK-11889
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.9.0
>
>
> During the [ML 
> discussion|https://lists.apache.org/thread.html/b8d2f3209e7ca7467af6037383ade6c14c35276f7acb2bbbc9a50c0f@%3Cdev.flink.apache.org%3E]
>  of 
> [FLIP-34|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212]
>  we realised that it would be beneficial for this new feature to replace the 
> existing "stop" functionality. The current "stop" functionality cannot be 
> used because no real-world sources support the functionality. Therefore, I 
> think it is save to remove because it should not break existing workflows.
> The issue proposes completely removing the old stop feature, introduced via 
> FLINK-2111, as preparation for 
> [FLIP-34|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212].
> We have to be careful when doing this because it touches quite a few things. 
> Basically, we have to do a manual revert of this commit: 
> https://github.com/apache/flink/commit/bdd4024e20fdfb0accb6121a68780ce3a0c218c0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10897) Support POJO state schema evolution

2019-03-22 Thread william hesch (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798569#comment-16798569
 ] 

william hesch edited comment on FLINK-10897 at 3/22/19 6:36 PM:


Does this also cover scala case classes?

Edit - It looks like no. They use the default kryoserializer. Any timeline for 
scala case class support?


was (Author: whesch):
Does this also cover scala case classes?

> Support POJO state schema evolution
> ---
>
> Key: FLINK-10897
> URL: https://issues.apache.org/jira/browse/FLINK-10897
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.8.0
>
>
> Main action point for this is to implement a separate POJO serializer that is 
> specifically used as the restore serializer.
> This restore POJO serializer should be able to read and dump values of fields 
> that no longer exists in the updated POJO schema, and assign default values 
> to newly added fields. Snapshot of the {{PojoSerializer}} should contain 
> sufficient information so that on restore, the information can be compared 
> with the adapted POJO class to figure out which fields have been removed / 
> added.
> Changing fields types is out of scope and should not be supported.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8902) Re-scaling job sporadically fails with KeeperException

2019-03-22 Thread David Anderson (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799210#comment-16799210
 ] 

David Anderson commented on FLINK-8902:
---

I'm also seeing this in a completely reproducible way. In my case a checkpoint 
completes while the state is being restored from the savepoint, which may 
explain why I'm seeing 

ConcurrentModificationException: ZooKeeper unexpectedly modified

 

> Re-scaling job sporadically fails with KeeperException
> --
>
> Key: FLINK-8902
> URL: https://issues.apache.org/jira/browse/FLINK-8902
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.0, 1.6.0
> Environment: Commit: 80020cb
> Hadoop: 2.8.3
> YARN
>  
>Reporter: Gary Yao
>Priority: Critical
>  Labels: flip6
> Fix For: 1.7.3, 1.6.5
>
>
> *Description*
>  Re-scaling a job with {{bin/flink modify -p }} sporadically 
> fails with a {{KeeperException}}
> *Steps to reproduce*
>  # Submit job to Flink cluster with flip6 enabled running on YARN (session 
> mode).
>  # Re-scale job (5-20 times)
> *Stacktrace (client)*
> {noformat}
> org.apache.flink.util.FlinkException: Could not rescale job 
> 61e2e99db2e959ebd94e40f9c5e816bc.
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$modify$8(CliFrontend.java:766)
>   at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:954)
>   at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:757)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1037)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1095)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1095)
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
> not restore from temporary rescaling savepoint. This might indicate that the 
> savepoint 
> hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got 
> corrupted. Deleting this savepoint as a precaution.
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$3(JobMaster.java:525)
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>   at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:295)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:150)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
> not restore from temporary rescaling savepoint. This might indicate that the 
> savepoint 
> hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got 
> corrupted. Deleting this savepoint as a precaution.
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$17(JobMaster.java:1317)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> 

[jira] [Resolved] (FLINK-11997) ConcurrentModificationException: ZooKeeper unexpectedly modified

2019-03-22 Thread David Anderson (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson resolved FLINK-11997.

Resolution: Duplicate

> ConcurrentModificationException: ZooKeeper unexpectedly modified
> 
>
> Key: FLINK-11997
> URL: https://issues.apache.org/jira/browse/FLINK-11997
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0
> Environment: Flink 1.8.0-rc4, running in a k8s job cluster with 
> checkpointing and savepointing in minio. Zookeeper enabled, also saving to 
> minio.
> jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 1024m
> taskmanager.numberOfTaskSlots: 4
> parallelism.default: 4
> high-availability: zookeeper
> high-availability.jobmanager.port: 6123
> high-availability.storageDir: s3://highavailability/storage
> high-availability.zookeeper.quorum: zoo1:2181
> state.backend: filesystem
> state.checkpoints.dir: s3://state/checkpoints
> state.savepoints.dir: s3://state/savepoints
> rest.port: 8081
> zookeeper.sasl.disable: true
> s3.access-key: minio
> s3.secret-key: minio123
> s3.path-style-access: true
> s3.endpoint: http://minio-service:9000
>  
>Reporter: David Anderson
>Priority: Major
> Attachments: FAILURE
>
>
> Trying to rescale a job running in a k8s job cluster via
> {{flink modify  -p 2 -m localhost:30081}}
> Rescaling works fine if HA is off. Taking a savepoint and restarting from one 
> also works fine, even with HA turned on. But rescaling by modifying the job 
> with HA on always fails as shown below:
> Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job 
> .
>         ... 21 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
> not restore from temporary rescaling savepoint. This might indicate that the 
> savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
> Deleting this savepoint as a precaution.
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$4(JobMaster.java:470)
>         at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>         at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>         ... 18 more
> Caused by: 
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
> not restore from temporary rescaling savepoint. This might indicate that the 
> savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
> Deleting this savepoint as a precaution.
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1433)
>         at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>         at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>         at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException: ZooKeeper unexpectedly 
> modified
>         at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:159)
>         at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.addCheckpoint(ZooKeeperCompletedCheckpointStore.java:216)
>         at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1106)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1251)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1413)
>         ... 10 more
> Caused by: 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NodeExistsException:
>  KeeperErrorCode = NodeExists
>         at 
> 

[jira] [Commented] (FLINK-9007) End-to-end test: Kinesis connector

2019-03-22 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799206#comment-16799206
 ] 

Thomas Weise commented on FLINK-9007:
-

1.8.x: 11af4523801164539e186d836462f5884b561941

 

> End-to-end test: Kinesis connector
> --
>
> Key: FLINK-9007
> URL: https://issues.apache.org/jira/browse/FLINK-9007
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kinesis, Tests
>Reporter: Till Rohrmann
>Assignee: Thomas Weise
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Add an end-to-end test which uses Flink's Kinesis connector to read and write 
> to Kinesis. We could use a simple pipe job with simple state for 
> checkpointing purposes. The checkpoints should then be written to S3 using 
> {{flink-s3-fs-hadoop}} and {{flink-s3-fs-presto}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9007) End-to-end test: Kinesis connector

2019-03-22 Thread Thomas Weise (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-9007:

Fix Version/s: 1.8.1

> End-to-end test: Kinesis connector
> --
>
> Key: FLINK-9007
> URL: https://issues.apache.org/jira/browse/FLINK-9007
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kinesis, Tests
>Reporter: Till Rohrmann
>Assignee: Thomas Weise
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Add an end-to-end test which uses Flink's Kinesis connector to read and write 
> to Kinesis. We could use a simple pipe job with simple state for 
> checkpointing purposes. The checkpoints should then be written to S3 using 
> {{flink-s3-fs-hadoop}} and {{flink-s3-fs-presto}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tweise merged pull request #8031: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-22 Thread GitBox
tweise merged pull request #8031: [FLINK-9007] [kinesis] [e2e] Add Kinesis 
end-to-end test
URL: https://github.com/apache/flink/pull/8031
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11997) ConcurrentModificationException: ZooKeeper unexpectedly modified

2019-03-22 Thread David Anderson (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson updated FLINK-11997:
---
Description: 
Trying to rescale a job running in a k8s job cluster via

{{flink modify  -p 2 -m localhost:30081}}

Rescaling works fine if HA is off. Taking a savepoint and restarting from one 
also works fine, even with HA turned on. But rescaling by modifying the job 
with HA on always fails as shown below:

Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job 
.

        ... 21 more

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
not restore from temporary rescaling savepoint. This might indicate that the 
savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
Deleting this savepoint as a precaution.

        at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$4(JobMaster.java:470)

        at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)

        at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)

        ... 18 more

Caused by: 
org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
not restore from temporary rescaling savepoint. This might indicate that the 
savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
Deleting this savepoint as a precaution.

        at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1433)

        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)

        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.ConcurrentModificationException: ZooKeeper unexpectedly 
modified

        at 
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:159)

        at 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.addCheckpoint(ZooKeeperCompletedCheckpointStore.java:216)

        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1106)

        at 
org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1251)

        at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1413)

        ... 10 more

Caused by: 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NodeExistsException:
 KeeperErrorCode = NodeExists

        at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:119)

        at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1006)

        at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:910)

        at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:159)

        at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.access$200(CuratorTransactionImpl.java:44)

        at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:129)

        at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:125)

        at 
org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109)

        at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.commit(CuratorTransactionImpl.java:122)

        at 
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:153)

        ... 14 more

  was:
Trying to rescale a job running in a k8s job cluster via

flink modify  -p 2 -m localhost:30081

Rescaling 

[jira] [Commented] (FLINK-11997) ConcurrentModificationException: ZooKeeper unexpectedly modified

2019-03-22 Thread David Anderson (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799203#comment-16799203
 ] 

David Anderson commented on FLINK-11997:


Looking at the logs more carefully (see attached file), I see that a checkpoint 
was completed after the job began to be started from the savepoint:

 

{{2019-03-22 15:40:57,924 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 
 from savepoint 
s3://state/savepoints/savepoint-00-2fa7fd5dabb2 ()}}

{{...}}

{{2019-03-22 15:40:58,119 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 29 for job  (2812 bytes in 85 ms).}}

{{...}}

{{2019-03-22 15:40:58,277 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
              - Could not restore from temporary rescaling savepoint. This 
might indicate that the savepoint 
s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. Deleting 
this savepoint as a precaution.}}

{{2019-03-22 15:40:58,302 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job  
() switched from state CREATED to FAILING.}}

 

> ConcurrentModificationException: ZooKeeper unexpectedly modified
> 
>
> Key: FLINK-11997
> URL: https://issues.apache.org/jira/browse/FLINK-11997
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0
> Environment: Flink 1.8.0-rc4, running in a k8s job cluster with 
> checkpointing and savepointing in minio. Zookeeper enabled, also saving to 
> minio.
> jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 1024m
> taskmanager.numberOfTaskSlots: 4
> parallelism.default: 4
> high-availability: zookeeper
> high-availability.jobmanager.port: 6123
> high-availability.storageDir: s3://highavailability/storage
> high-availability.zookeeper.quorum: zoo1:2181
> state.backend: filesystem
> state.checkpoints.dir: s3://state/checkpoints
> state.savepoints.dir: s3://state/savepoints
> rest.port: 8081
> zookeeper.sasl.disable: true
> s3.access-key: minio
> s3.secret-key: minio123
> s3.path-style-access: true
> s3.endpoint: http://minio-service:9000
>  
>Reporter: David Anderson
>Priority: Major
> Attachments: FAILURE
>
>
> Trying to rescale a job running in a k8s job cluster via
> flink modify  -p 2 -m localhost:30081
> Rescaling works fine if HA is off. Taking a savepoint and restarting from one 
> also works fine, even with HA turned on. But rescaling by modifying the job 
> with HA on always fails as shown below:
> Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job 
> .
>         ... 21 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
> not restore from temporary rescaling savepoint. This might indicate that the 
> savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
> Deleting this savepoint as a precaution.
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$4(JobMaster.java:470)
>         at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>         at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>         ... 18 more
> Caused by: 
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
> not restore from temporary rescaling savepoint. This might indicate that the 
> savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
> Deleting this savepoint as a precaution.
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1433)
>         at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>         at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>         at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> 

[GitHub] [flink] jgrier commented on issue #8031: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-22 Thread GitBox
jgrier commented on issue #8031: [FLINK-9007] [kinesis] [e2e] Add Kinesis 
end-to-end test
URL: https://github.com/apache/flink/pull/8031#issuecomment-475706723
 
 
   lgtm


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8031: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-22 Thread GitBox
flinkbot edited a comment on issue #8031: [FLINK-9007] [kinesis] [e2e] Add 
Kinesis end-to-end test
URL: https://github.com/apache/flink/pull/8031#issuecomment-475346824
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @jgrier
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @jgrier
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @jgrier
   * ✅ 5. Overall code [quality] is good.
   - Approved by @jgrier
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jgrier commented on issue #8031: [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test

2019-03-22 Thread GitBox
jgrier commented on issue #8031: [FLINK-9007] [kinesis] [e2e] Add Kinesis 
end-to-end test
URL: https://github.com/apache/flink/pull/8031#issuecomment-475706272
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11997) ConcurrentModificationException: ZooKeeper unexpectedly modified

2019-03-22 Thread David Anderson (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson updated FLINK-11997:
---
Attachment: FAILURE

> ConcurrentModificationException: ZooKeeper unexpectedly modified
> 
>
> Key: FLINK-11997
> URL: https://issues.apache.org/jira/browse/FLINK-11997
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0
> Environment: Flink 1.8.0-rc4, running in a k8s job cluster with 
> checkpointing and savepointing in minio. Zookeeper enabled, also saving to 
> minio.
> jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 1024m
> taskmanager.numberOfTaskSlots: 4
> parallelism.default: 4
> high-availability: zookeeper
> high-availability.jobmanager.port: 6123
> high-availability.storageDir: s3://highavailability/storage
> high-availability.zookeeper.quorum: zoo1:2181
> state.backend: filesystem
> state.checkpoints.dir: s3://state/checkpoints
> state.savepoints.dir: s3://state/savepoints
> rest.port: 8081
> zookeeper.sasl.disable: true
> s3.access-key: minio
> s3.secret-key: minio123
> s3.path-style-access: true
> s3.endpoint: http://minio-service:9000
>  
>Reporter: David Anderson
>Priority: Major
> Attachments: FAILURE
>
>
> Trying to rescale a job running in a k8s job cluster via
> flink modify  -p 2 -m localhost:30081
> Rescaling works fine if HA is off. Taking a savepoint and restarting from one 
> also works fine, even with HA turned on. But rescaling by modifying the job 
> with HA on always fails as shown below:
> Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job 
> .
>         ... 21 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
> not restore from temporary rescaling savepoint. This might indicate that the 
> savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
> Deleting this savepoint as a precaution.
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$4(JobMaster.java:470)
>         at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>         at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>         ... 18 more
> Caused by: 
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
> not restore from temporary rescaling savepoint. This might indicate that the 
> savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
> Deleting this savepoint as a precaution.
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1433)
>         at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>         at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>         at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException: ZooKeeper unexpectedly 
> modified
>         at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:159)
>         at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.addCheckpoint(ZooKeeperCompletedCheckpointStore.java:216)
>         at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1106)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1251)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1413)
>         ... 10 more
> Caused by: 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NodeExistsException:
>  KeeperErrorCode = NodeExists
>         at 
> 

[jira] [Updated] (FLINK-5601) Window operator does not checkpoint watermarks

2019-03-22 Thread bupt_ljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bupt_ljy updated FLINK-5601:

Affects Version/s: 1.9.0

> Window operator does not checkpoint watermarks
> --
>
> Key: FLINK-5601
> URL: https://issues.apache.org/jira/browse/FLINK-5601
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.0, 1.6.0, 1.7.0, 1.8.0, 1.9.0
>Reporter: Ufuk Celebi
>Assignee: bupt_ljy
>Priority: Critical
>  Labels: pull-request-available
>
> During release testing [~stefanrichte...@gmail.com] and I noticed that 
> watermarks are not checkpointed in the window operator.
> This can lead to non determinism when restoring checkpoints. I was running an 
> adjusted {{SessionWindowITCase}} via Kafka for testing migration and 
> rescaling and ran into failures, because the data generator required 
> determinisitic behaviour.
> What happened was that on restore it could happen that late elements were not 
> dropped, because the watermarks needed to be re-established after restore 
> first.
> [~aljoscha] Do you know whether there is a special reason for explicitly not 
> checkpointing watermarks?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-5601) Window operator does not checkpoint watermarks

2019-03-22 Thread bupt_ljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bupt_ljy updated FLINK-5601:

Affects Version/s: 1.8.0

> Window operator does not checkpoint watermarks
> --
>
> Key: FLINK-5601
> URL: https://issues.apache.org/jira/browse/FLINK-5601
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.0, 1.6.0, 1.7.0, 1.8.0
>Reporter: Ufuk Celebi
>Assignee: bupt_ljy
>Priority: Critical
>  Labels: pull-request-available
>
> During release testing [~stefanrichte...@gmail.com] and I noticed that 
> watermarks are not checkpointed in the window operator.
> This can lead to non determinism when restoring checkpoints. I was running an 
> adjusted {{SessionWindowITCase}} via Kafka for testing migration and 
> rescaling and ran into failures, because the data generator required 
> determinisitic behaviour.
> What happened was that on restore it could happen that late elements were not 
> dropped, because the watermarks needed to be re-established after restore 
> first.
> [~aljoscha] Do you know whether there is a special reason for explicitly not 
> checkpointing watermarks?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11997) ConcurrentModificationException: ZooKeeper unexpectedly modified

2019-03-22 Thread David Anderson (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson updated FLINK-11997:
---
Description: 
Trying to rescale a job running in a k8s job cluster via

flink modify  -p 2 -m localhost:30081

Rescaling works fine if HA is off. Taking a savepoint and restarting from one 
also works fine, even with HA turned on. But rescaling by modifying the job 
with HA on always fails as shown below:

Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job 
.

        ... 21 more

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
not restore from temporary rescaling savepoint. This might indicate that the 
savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
Deleting this savepoint as a precaution.

        at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$4(JobMaster.java:470)

        at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)

        at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)

        ... 18 more

Caused by: 
org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
not restore from temporary rescaling savepoint. This might indicate that the 
savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
Deleting this savepoint as a precaution.

        at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1433)

        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)

        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.ConcurrentModificationException: ZooKeeper unexpectedly 
modified

        at 
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:159)

        at 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.addCheckpoint(ZooKeeperCompletedCheckpointStore.java:216)

        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1106)

        at 
org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1251)

        at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1413)

        ... 10 more

Caused by: 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NodeExistsException:
 KeeperErrorCode = NodeExists

        at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:119)

        at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1006)

        at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:910)

        at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:159)

        at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.access$200(CuratorTransactionImpl.java:44)

        at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:129)

        at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:125)

        at 
org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109)

        at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorTransactionImpl.commit(CuratorTransactionImpl.java:122)

        at 
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:153)

        ... 14 more

  was:
Trying to rescale a job running in a k8s job cluster via

flink modify  -p 2 -m localhost:30081

Rescaling 

[jira] [Created] (FLINK-11997) ConcurrentModificationException: ZooKeeper unexpectedly modified

2019-03-22 Thread David Anderson (JIRA)
David Anderson created FLINK-11997:
--

 Summary: ConcurrentModificationException: ZooKeeper unexpectedly 
modified
 Key: FLINK-11997
 URL: https://issues.apache.org/jira/browse/FLINK-11997
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.8.0
 Environment: Flink 1.8.0-rc4, running in a k8s job cluster with 
checkpointing and savepointing in minio. Zookeeper enabled, also saving to 
minio.

jobmanager.rpc.address: localhost

jobmanager.rpc.port: 6123

jobmanager.heap.size: 1024m

taskmanager.heap.size: 1024m

taskmanager.numberOfTaskSlots: 4

parallelism.default: 4

high-availability: zookeeper

high-availability.jobmanager.port: 6123

high-availability.storageDir: s3://highavailability/storage

high-availability.zookeeper.quorum: zoo1:2181

state.backend: filesystem

state.checkpoints.dir: s3://state/checkpoints

state.savepoints.dir: s3://state/savepoints

rest.port: 8081

zookeeper.sasl.disable: true

s3.access-key: minio

s3.secret-key: minio123

s3.path-style-access: true

s3.endpoint: http://minio-service:9000

 
Reporter: David Anderson


Trying to rescale a job running in a k8s job cluster via

flink modify  -p 2 -m localhost:30081

Rescaling works fine if HA is off. Taking a savepoint and restarting from one 
also works fine, even with HA turned on. But rescaling by modifying the job via

always fails as shown below:

Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job 
.

        ... 21 more

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
not restore from temporary rescaling savepoint. This might indicate that the 
savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
Deleting this savepoint as a precaution.

        at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$4(JobMaster.java:470)

        at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)

        at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)

        ... 18 more

Caused by: 
org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
not restore from temporary rescaling savepoint. This might indicate that the 
savepoint s3://state/savepoints/savepoint-00-2fa7fd5dabb2 got corrupted. 
Deleting this savepoint as a precaution.

        at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1433)

        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)

        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.ConcurrentModificationException: ZooKeeper unexpectedly 
modified

        at 
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:159)

        at 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.addCheckpoint(ZooKeeperCompletedCheckpointStore.java:216)

        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1106)

        at 
org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1251)

        at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$18(JobMaster.java:1413)

        ... 10 more

Caused by: 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NodeExistsException:
 KeeperErrorCode = NodeExists

        at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:119)

        at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1006)

        at 
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:910)

        at 

[jira] [Updated] (FLINK-11276) Sliding Window Optimization

2019-03-22 Thread Rong Rong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-11276:
--
Description: 
This umbrella JIRA focus on the improvement of the existing window operator 
*WITHOUT* changing the public facing API. Please find the initial design plan 
in: 
[https://docs.google.com/document/d/1CvjPJl1Fm1PCpsuuZ4Qc-p_iUzUosBePX_rWNUt8lRw/edit?usp=sharing]
 
 and the execution plan discussion in:
 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-tt25750.html]

Per the discussion in the dev mailing list. We would like to only focus on 
improvement in the following perspective of the {{WindowOperator:(}}
 1. State optimization
 2. Internal Window Function 
 3. Backward-compatibility

 

Reference: Initial improvement discussion can be found in:
 
[https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing]

  was:
This is the umbrella ticket for the improvement discussion in:
 
[https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing]
 and the execution plan discussion in:
 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-tt25750.html]

This umbrella JIRA focus on the improvement of the existing window operator 
*WITHOUT* changing the public facing API. Please find the initial design plan 
in: 
[https://docs.google.com/document/d/1CvjPJl1Fm1PCpsuuZ4Qc-p_iUzUosBePX_rWNUt8lRw/edit?usp=sharing]
 

Per the discussion in the dev mailing list. We would like to only focus on 
improvement in the following perspective of the {{WindowOperator:(}}
 1. State optimization
 2. Internal Window Function 
 3. Backward-compatibility


> Sliding Window Optimization
> ---
>
> Key: FLINK-11276
> URL: https://issues.apache.org/jira/browse/FLINK-11276
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.7.0
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This umbrella JIRA focus on the improvement of the existing window operator 
> *WITHOUT* changing the public facing API. Please find the initial design plan 
> in: 
> [https://docs.google.com/document/d/1CvjPJl1Fm1PCpsuuZ4Qc-p_iUzUosBePX_rWNUt8lRw/edit?usp=sharing]
>  
>  and the execution plan discussion in:
>  
> [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-tt25750.html]
> Per the discussion in the dev mailing list. We would like to only focus on 
> improvement in the following perspective of the {{WindowOperator:(}}
>  1. State optimization
>  2. Internal Window Function 
>  3. Backward-compatibility
>  
> Reference: Initial improvement discussion can be found in:
>  
> [https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-22 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799086#comment-16799086
 ] 

Jürgen Kreileder commented on FLINK-11654:
--

[~pnowojski] It's a rather simple job running with a parallelism of 3 on Yarn. 
The restart delay is 2 minutes, so old parts should have been gone. But I don't 
really remember if a restart was involved at all. I'll do more testing with 
EXACTLY_ONCE next week.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11276) Slicing Window Optimization

2019-03-22 Thread Rong Rong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-11276:
--
Description: 
This is the umbrella ticket for the improvement discussion in:
 
[https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing]
 and the execution plan discussion in:
 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-tt25750.html]

This umbrella JIRA focus on the improvement of the existing window operator 
*WITHOUT* changing the public facing API. Please find the initial design plan 
in: 
[https://docs.google.com/document/d/1CvjPJl1Fm1PCpsuuZ4Qc-p_iUzUosBePX_rWNUt8lRw/edit?usp=sharing]
 

Per the discussion in the dev mailing list. We would like to only focus on 
improvement in the following perspective of the {{WindowOperator:(}}
 1. State optimization
 2. Internal Window Function 
 3. Backward-compatibility

  was:
This is the umbrella ticket for the improvement discussion in:
 
[https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing]
 and the execution plan discussion in:
 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-tt25750.html]

This umbrella JIRA focus on the improvement of the existing window operator 
*WITHOUT* changing the public facing API.

Per the discussion in the dev mailing list. We would like to only focus on 
improvement in the following perspective of the {{WindowOperator}}:
1. State optimization
2. Internal Window Function 
3. Backward-compatibility


> Slicing Window Optimization
> ---
>
> Key: FLINK-11276
> URL: https://issues.apache.org/jira/browse/FLINK-11276
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.7.0
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This is the umbrella ticket for the improvement discussion in:
>  
> [https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing]
>  and the execution plan discussion in:
>  
> [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-tt25750.html]
> This umbrella JIRA focus on the improvement of the existing window operator 
> *WITHOUT* changing the public facing API. Please find the initial design plan 
> in: 
> [https://docs.google.com/document/d/1CvjPJl1Fm1PCpsuuZ4Qc-p_iUzUosBePX_rWNUt8lRw/edit?usp=sharing]
>  
> Per the discussion in the dev mailing list. We would like to only focus on 
> improvement in the following perspective of the {{WindowOperator:(}}
>  1. State optimization
>  2. Internal Window Function 
>  3. Backward-compatibility



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11276) Sliding Window Optimization

2019-03-22 Thread Rong Rong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-11276:
--
Summary: Sliding Window Optimization  (was: Slicing Window Optimization)

> Sliding Window Optimization
> ---
>
> Key: FLINK-11276
> URL: https://issues.apache.org/jira/browse/FLINK-11276
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.7.0
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This is the umbrella ticket for the improvement discussion in:
>  
> [https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit?usp=sharing]
>  and the execution plan discussion in:
>  
> [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-tt25750.html]
> This umbrella JIRA focus on the improvement of the existing window operator 
> *WITHOUT* changing the public facing API. Please find the initial design plan 
> in: 
> [https://docs.google.com/document/d/1CvjPJl1Fm1PCpsuuZ4Qc-p_iUzUosBePX_rWNUt8lRw/edit?usp=sharing]
>  
> Per the discussion in the dev mailing list. We would like to only focus on 
> improvement in the following perspective of the {{WindowOperator:(}}
>  1. State optimization
>  2. Internal Window Function 
>  3. Backward-compatibility



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] HuangZhenQiu commented on issue #7978: [FLINK-11910] [Yarn] add customizable yarn application type

2019-03-22 Thread GitBox
HuangZhenQiu commented on issue #7978: [FLINK-11910] [Yarn] add customizable 
yarn application type
URL: https://github.com/apache/flink/pull/7978#issuecomment-475655770
 
 
   @suez1224 @rmetzger 
   Would you please have one more round of review?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7244) Add ParquetTableSource Implementation based on ParquetInputFormat

2019-03-22 Thread Zhenqiu Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799083#comment-16799083
 ] 

Zhenqiu Huang commented on FLINK-7244:
--

[~yanghua]

I am adding more test cases. Will have an initial PR this weekend. 

> Add ParquetTableSource Implementation based on ParquetInputFormat
> -
>
> Key: FLINK-7244
> URL: https://issues.apache.org/jira/browse/FLINK-7244
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Ecosystem
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot edited a comment on issue #7856: [FLINK-11776][coordination] Refactor to simplify the process of scheduleOrUpdateConsumers

2019-03-22 Thread GitBox
flinkbot edited a comment on issue #7856: [FLINK-11776][coordination] Refactor 
to simplify the process of scheduleOrUpdateConsumers
URL: https://github.com/apache/flink/pull/7856#issuecomment-468170966
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @azagrebin
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @azagrebin
   * ❗ 3. Needs [attention] from.
   - Needs attention by @tillrohrmann [PMC]
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @azagrebin
   * ✅ 5. Overall code [quality] is good.
   - Approved by @azagrebin
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #7856: [FLINK-11776][coordination] Refactor to simplify the process of scheduleOrUpdateConsumers

2019-03-22 Thread GitBox
azagrebin commented on issue #7856: [FLINK-11776][coordination] Refactor to 
simplify the process of scheduleOrUpdateConsumers
URL: https://github.com/apache/flink/pull/7856#issuecomment-475641676
 
 
   @flinkbot approve all
   @flinkbot attention @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-03-22 Thread GitBox
flinkbot commented on issue #8038: [FLINK-11953] Introduce Plugin/Loading 
system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#issuecomment-475639958
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11953) Introduce Plugin/Loading system and integrate it with FileSystem

2019-03-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11953:
---
Labels: pull-request-available  (was: )

> Introduce Plugin/Loading system and integrate it with FileSystem
> 
>
> Key: FLINK-11953
> URL: https://issues.apache.org/jira/browse/FLINK-11953
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StefanRRichter opened a new pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-03-22 Thread GitBox
StefanRRichter opened a new pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038
 
 
   ## What is the purpose of the change
   
   We want to change the general architecture for loading FileSystems in Flink 
to a plugin architecture. The advantage of this change is that it would invert 
the classloading from parent-first to child-first and therefore enables us to 
move away from shading to avoid class/version conflics.
   
   Note that this general approach could also be used in other places for Flink 
in the future, but this task is targetting only the file systems for now. 
Furthermore, this is the first PR, introducing the general mechanism. We still 
need followup work with changes to the build and shipping/providing the plugin 
folder.
   
   ## Brief change log
   - Made {{ChildFirstClassLoader}} a toplevel class.and move to flink core.
   - Introduce {{Plugin}} interface, {{PluginLoader}}, {{PluginManager}} for 
basic plugin mechanism.
   - Introduce init of plugin manager singleton to process entry points.
   - Integrate plugin mechanism with {{FileSystem}} class.
   
   ## Verifying this change
   
   Added unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (yes)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (feature not yet compelted)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7856: [FLINK-11776][coordination] Refactor to simplify the process of scheduleOrUpdateConsumers

2019-03-22 Thread GitBox
azagrebin commented on a change in pull request #7856: 
[FLINK-11776][coordination] Refactor to simplify the process of 
scheduleOrUpdateConsumers
URL: https://github.com/apache/flink/pull/7856#discussion_r268189476
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -1119,27 +1076,15 @@ private void finishCancellation() {
}
}
 
-   void cachePartitionInfo(PartialInputChannelDeploymentDescriptor 
partitionInfo) {
-   partialInputChannelDeploymentDescriptors.add(partitionInfo);
+   void cachePartitionInfo(PartitionInfo partitionInfo) {
+   partitionInfos.add(partitionInfo);
}
 
-   void sendPartitionInfos() {
-   // check if the ExecutionVertex has already been archived and 
thus cleared the
-   // partial partition infos queue
-   if (partialInputChannelDeploymentDescriptors != null && 
!partialInputChannelDeploymentDescriptors.isEmpty()) {
-
-   PartialInputChannelDeploymentDescriptor 
partialInputChannelDeploymentDescriptor;
-
-   List partitionInfos = new 
ArrayList<>(partialInputChannelDeploymentDescriptors.size());
-
-   while ((partialInputChannelDeploymentDescriptor = 
partialInputChannelDeploymentDescriptors.poll()) != null) {
-   partitionInfos.add(
-   new PartitionInfo(
-   
partialInputChannelDeploymentDescriptor.getResultId(),
-   
partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this)));
-   }
+   private void sendPartitionInfos() {
+   if (!partitionInfos.isEmpty()) {
+   
sendUpdatePartitionInfoRpcCall(Lists.newArrayList(partitionInfos));
 
 Review comment:
   Could we avoid using 3rd party dependencies like Guava? it could be just 
`new ArrayList<>(partitionInfos)`. Is his defensive copy of `partitionInfos` 
actually needed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7856: [FLINK-11776][coordination] Refactor to simplify the process of scheduleOrUpdateConsumers

2019-03-22 Thread GitBox
azagrebin commented on a change in pull request #7856: 
[FLINK-11776][coordination] Refactor to simplify the process of 
scheduleOrUpdateConsumers
URL: https://github.com/apache/flink/pull/7856#discussion_r268188124
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -818,71 +800,46 @@ else if (numConsumers == 0) {

consumerVertex.checkInputDependencyConstraints()) {
scheduleConsumer(consumerVertex);
}
-
-   // double check to resolve race conditions
-   if (consumerVertex.getExecutionState() == 
RUNNING) {
-   consumerVertex.sendPartitionInfos();
-   }
}
// 

// Consumer is running => send update message now
+   // Consumer is deploying => cache the partition info 
which would be
+   // sent after switching to running
// 

-   else {
-   if (consumerState == RUNNING) {
-   final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
-
-   if (consumerSlot == null) {
-   // The consumer has been reset 
concurrently
-   continue;
-   }
-
-   final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
-   
.getCurrentAssignedResource().getTaskManagerLocation();
-   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
-
-   final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerLocation().getResourceID();
-
-   final ResultPartitionID partitionId = 
new ResultPartitionID(partition.getPartitionId(), attemptId);
-
-   final ResultPartitionLocation 
partitionLocation;
+   else if (consumerState == DEPLOYING || consumerState == 
RUNNING) {
+   final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
+   if (consumerSlot == null) {
+   // The consumer has been reset 
concurrently
+   continue;
+   }
 
-   if 
(consumerTaskManager.equals(partitionTaskManager)) {
-   // Consuming task is deployed 
to the same instance as the partition => local
-   partitionLocation = 
ResultPartitionLocation.createLocal();
-   }
-   else {
-   // Different instances => remote
-   final ConnectionID connectionId 
= new ConnectionID(
-   
partitionTaskManagerLocation,
-   
partition.getIntermediateResult().getConnectionIndex());
+   final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
+   
.getCurrentAssignedResource().getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
+   final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerLocation().getResourceID();
 
-   partitionLocation = 
ResultPartitionLocation.createRemote(connectionId);
-   }
+   final ResultPartitionLocation partitionLocation;
+   if 
(consumerTaskManager.equals(partitionTaskManager)) {
+   // Consuming task is deployed to the 
same instance as the partition => local
+   partitionLocation = 
ResultPartitionLocation.createLocal();
+   } else {
+   // Different instances => remote
+   final ConnectionID connectionId = new 
ConnectionID(
+  

[GitHub] [flink] azagrebin commented on a change in pull request #7856: [FLINK-11776][coordination] Refactor to simplify the process of scheduleOrUpdateConsumers

2019-03-22 Thread GitBox
azagrebin commented on a change in pull request #7856: 
[FLINK-11776][coordination] Refactor to simplify the process of 
scheduleOrUpdateConsumers
URL: https://github.com/apache/flink/pull/7856#discussion_r268180242
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -818,71 +800,46 @@ else if (numConsumers == 0) {

consumerVertex.checkInputDependencyConstraints()) {
scheduleConsumer(consumerVertex);
}
-
-   // double check to resolve race conditions
-   if (consumerVertex.getExecutionState() == 
RUNNING) {
-   consumerVertex.sendPartitionInfos();
-   }
}
// 

// Consumer is running => send update message now
+   // Consumer is deploying => cache the partition info 
which would be
+   // sent after switching to running
// 

-   else {
-   if (consumerState == RUNNING) {
-   final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
-
-   if (consumerSlot == null) {
-   // The consumer has been reset 
concurrently
-   continue;
-   }
-
-   final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
-   
.getCurrentAssignedResource().getTaskManagerLocation();
-   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
-
-   final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerLocation().getResourceID();
-
-   final ResultPartitionID partitionId = 
new ResultPartitionID(partition.getPartitionId(), attemptId);
-
-   final ResultPartitionLocation 
partitionLocation;
+   else if (consumerState == DEPLOYING || consumerState == 
RUNNING) {
+   final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
+   if (consumerSlot == null) {
+   // The consumer has been reset 
concurrently
+   continue;
+   }
 
-   if 
(consumerTaskManager.equals(partitionTaskManager)) {
-   // Consuming task is deployed 
to the same instance as the partition => local
-   partitionLocation = 
ResultPartitionLocation.createLocal();
-   }
-   else {
-   // Different instances => remote
-   final ConnectionID connectionId 
= new ConnectionID(
-   
partitionTaskManagerLocation,
-   
partition.getIntermediateResult().getConnectionIndex());
+   final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
+   
.getCurrentAssignedResource().getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
+   final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerLocation().getResourceID();
 
-   partitionLocation = 
ResultPartitionLocation.createRemote(connectionId);
-   }
+   final ResultPartitionLocation partitionLocation;
+   if 
(consumerTaskManager.equals(partitionTaskManager)) {
+   // Consuming task is deployed to the 
same instance as the partition => local
+   partitionLocation = 
ResultPartitionLocation.createLocal();
+   } else {
+   // Different instances => remote
+   final ConnectionID connectionId = new 
ConnectionID(
+  

[GitHub] [flink] azagrebin commented on a change in pull request #7856: [FLINK-11776][coordination] Refactor to simplify the process of scheduleOrUpdateConsumers

2019-03-22 Thread GitBox
azagrebin commented on a change in pull request #7856: 
[FLINK-11776][coordination] Refactor to simplify the process of 
scheduleOrUpdateConsumers
URL: https://github.com/apache/flink/pull/7856#discussion_r268177944
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -818,71 +800,46 @@ else if (numConsumers == 0) {

consumerVertex.checkInputDependencyConstraints()) {
scheduleConsumer(consumerVertex);
}
-
-   // double check to resolve race conditions
-   if (consumerVertex.getExecutionState() == 
RUNNING) {
-   consumerVertex.sendPartitionInfos();
-   }
}
// 

// Consumer is running => send update message now
+   // Consumer is deploying => cache the partition info 
which would be
+   // sent after switching to running
// 

-   else {
-   if (consumerState == RUNNING) {
-   final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
-
-   if (consumerSlot == null) {
-   // The consumer has been reset 
concurrently
-   continue;
-   }
-
-   final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
-   
.getCurrentAssignedResource().getTaskManagerLocation();
-   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
-
-   final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerLocation().getResourceID();
-
-   final ResultPartitionID partitionId = 
new ResultPartitionID(partition.getPartitionId(), attemptId);
-
-   final ResultPartitionLocation 
partitionLocation;
+   else if (consumerState == DEPLOYING || consumerState == 
RUNNING) {
+   final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
+   if (consumerSlot == null) {
+   // The consumer has been reset 
concurrently
+   continue;
+   }
 
-   if 
(consumerTaskManager.equals(partitionTaskManager)) {
-   // Consuming task is deployed 
to the same instance as the partition => local
-   partitionLocation = 
ResultPartitionLocation.createLocal();
-   }
-   else {
-   // Different instances => remote
-   final ConnectionID connectionId 
= new ConnectionID(
-   
partitionTaskManagerLocation,
-   
partition.getIntermediateResult().getConnectionIndex());
+   final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
+   
.getCurrentAssignedResource().getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
+   final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerLocation().getResourceID();
 
-   partitionLocation = 
ResultPartitionLocation.createRemote(connectionId);
-   }
+   final ResultPartitionLocation partitionLocation;
+   if 
(consumerTaskManager.equals(partitionTaskManager)) {
+   // Consuming task is deployed to the 
same instance as the partition => local
+   partitionLocation = 
ResultPartitionLocation.createLocal();
+   } else {
+   // Different instances => remote
+   final ConnectionID connectionId = new 
ConnectionID(
+  

[GitHub] [flink] flinkbot edited a comment on issue #7856: [FLINK-11776][coordination] Refactor to simplify the process of scheduleOrUpdateConsumers

2019-03-22 Thread GitBox
flinkbot edited a comment on issue #7856: [FLINK-11776][coordination] Refactor 
to simplify the process of scheduleOrUpdateConsumers
URL: https://github.com/apache/flink/pull/7856#issuecomment-468170966
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7856: [FLINK-11776][coordination] Refactor to simplify the process of scheduleOrUpdateConsumers

2019-03-22 Thread GitBox
azagrebin commented on a change in pull request #7856: 
[FLINK-11776][coordination] Refactor to simplify the process of 
scheduleOrUpdateConsumers
URL: https://github.com/apache/flink/pull/7856#discussion_r268172289
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -818,71 +800,46 @@ else if (numConsumers == 0) {

consumerVertex.checkInputDependencyConstraints()) {
scheduleConsumer(consumerVertex);
}
-
-   // double check to resolve race conditions
-   if (consumerVertex.getExecutionState() == 
RUNNING) {
-   consumerVertex.sendPartitionInfos();
-   }
}
// 

// Consumer is running => send update message now
+   // Consumer is deploying => cache the partition info 
which would be
+   // sent after switching to running
// 

-   else {
-   if (consumerState == RUNNING) {
-   final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
-
-   if (consumerSlot == null) {
-   // The consumer has been reset 
concurrently
-   continue;
-   }
-
-   final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
-   
.getCurrentAssignedResource().getTaskManagerLocation();
-   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
-
-   final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerLocation().getResourceID();
-
-   final ResultPartitionID partitionId = 
new ResultPartitionID(partition.getPartitionId(), attemptId);
-
-   final ResultPartitionLocation 
partitionLocation;
+   else if (consumerState == DEPLOYING || consumerState == 
RUNNING) {
+   final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
+   if (consumerSlot == null) {
 
 Review comment:
   `consumerSlot == null` concurrent change should not be the case anymore as 
well.
   `ASSIGNED_SLOT_UPDATER` in `Execution.tryAssignResource` is called only in 
main thread, same as `scheduleOrUpdateConsumers`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11907) GenericTypeInfoTest fails on Java 9

2019-03-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11907:
---
Labels: pull-request-available  (was: )

> GenericTypeInfoTest fails on Java 9
> ---
>
> Key: FLINK-11907
> URL: https://issues.apache.org/jira/browse/FLINK-11907
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> Output difference:
> {code:java}
>     pojos:java.util.List
>     key:int
>     sqlDate:java.sql.Date
>     bigInt:java.math.BigInteger
>     signum:int
>     mag:[I
> -   bitCount:int
> -   bitLength:int
> -   lowestSetBit:int
> -   firstNonzeroIntNum:int
> +   bitCountPlusOne:int
> +   bitLengthPlusOne:int
> +   lowestSetBitPlusTwo:int
> +   firstNonzeroIntNumPlusTwo:int
>     bigDecimalKeepItNull:java.math.BigDecimal
>     intVal:java.math.BigInteger
>     signum:int
>     mag:[I
> -   bitCount:int
> -   bitLength:int
> -   lowestSetBit:int
> -   firstNonzeroIntNum:int
> +   bitCountPlusOne:int
> +   bitLengthPlusOne:int
> +   lowestSetBitPlusTwo:int
> +   firstNonzeroIntNumPlusTwo:int
>     scale:int
>     scalaBigInt:scala.math.BigInt
>     bigInteger:java.math.BigInteger
>     signum:int
>     mag:[I
> -   bitCount:int
> -   bitLength:int
> -   lowestSetBit:int
> -   firstNonzeroIntNum:int
> +   bitCountPlusOne:int
> +   bitLengthPlusOne:int
> +   lowestSetBitPlusTwo:int
> +   firstNonzeroIntNumPlusTwo:int
>     mixed:java.util.List
>     
> makeMeGeneric:org.apache.flink.test.operators.util.CollectionDataSets$PojoWithDateAndEnum
>     group:java.lang.String
> +   value:[B
> +   coder:byte
> +   hash:int
> +   date:java.util.Date
> +   cat:org.apache.flink.test.operators.util.CollectionDataSets$Category 
> (is enum)
>      
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8037: [FLINK-11907][types] Normalize String/BigInteger in GenericTypeInfoTest

2019-03-22 Thread GitBox
flinkbot commented on issue #8037: [FLINK-11907][types] Normalize 
String/BigInteger in GenericTypeInfoTest
URL: https://github.com/apache/flink/pull/8037#issuecomment-475624722
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol opened a new pull request #8037: [FLINK-11907][types] Normalize String/BigInteger in GenericTypeInfoTest

2019-03-22 Thread GitBox
zentol opened a new pull request #8037: [FLINK-11907][types] Normalize 
String/BigInteger in GenericTypeInfoTest
URL: https://github.com/apache/flink/pull/8037
 
 
   ## What is the purpose of the change
   
   This PR modifies modifies the `GenericTypeInfoTest` to run on Java 9. The 
test struggles with java version-specific representations of Strings and 
BigIntegers, so we're normalizing these now before doing the assertion.
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11907) GenericTypeInfoTest fails on Java 9

2019-03-22 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-11907:


Assignee: Chesnay Schepler

> GenericTypeInfoTest fails on Java 9
> ---
>
> Key: FLINK-11907
> URL: https://issues.apache.org/jira/browse/FLINK-11907
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> Output difference:
> {code:java}
>     pojos:java.util.List
>     key:int
>     sqlDate:java.sql.Date
>     bigInt:java.math.BigInteger
>     signum:int
>     mag:[I
> -   bitCount:int
> -   bitLength:int
> -   lowestSetBit:int
> -   firstNonzeroIntNum:int
> +   bitCountPlusOne:int
> +   bitLengthPlusOne:int
> +   lowestSetBitPlusTwo:int
> +   firstNonzeroIntNumPlusTwo:int
>     bigDecimalKeepItNull:java.math.BigDecimal
>     intVal:java.math.BigInteger
>     signum:int
>     mag:[I
> -   bitCount:int
> -   bitLength:int
> -   lowestSetBit:int
> -   firstNonzeroIntNum:int
> +   bitCountPlusOne:int
> +   bitLengthPlusOne:int
> +   lowestSetBitPlusTwo:int
> +   firstNonzeroIntNumPlusTwo:int
>     scale:int
>     scalaBigInt:scala.math.BigInt
>     bigInteger:java.math.BigInteger
>     signum:int
>     mag:[I
> -   bitCount:int
> -   bitLength:int
> -   lowestSetBit:int
> -   firstNonzeroIntNum:int
> +   bitCountPlusOne:int
> +   bitLengthPlusOne:int
> +   lowestSetBitPlusTwo:int
> +   firstNonzeroIntNumPlusTwo:int
>     mixed:java.util.List
>     
> makeMeGeneric:org.apache.flink.test.operators.util.CollectionDataSets$PojoWithDateAndEnum
>     group:java.lang.String
> +   value:[B
> +   coder:byte
> +   hash:int
> +   date:java.util.Date
> +   cat:org.apache.flink.test.operators.util.CollectionDataSets$Category 
> (is enum)
>      
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] Port Kafka E2E test to Java

2019-03-22 Thread GitBox
aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] 
Port Kafka E2E test to Java
URL: https://github.com/apache/flink/pull/7605#discussion_r268147612
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/JobSubmission.java
 ##
 @@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink;
+
+import org.apache.flink.util.Preconditions;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Programmatic definition of a job-submission.
+ */
+public class JobSubmission {
+
+   private final Path jar;
+   private final int parallelism;
+   private final boolean detached;
+   private final List arguments;
+
+   JobSubmission(final Path jar, final int parallelism, final boolean 
detached, final List arguments) {
+   this.jar = jar;
+   this.parallelism = parallelism;
+   this.detached = detached;
+   this.arguments = Collections.unmodifiableList(arguments);
+   }
+
+   public List getArguments() {
+   return arguments;
+   }
+
+   public boolean isDetached() {
+   return detached;
+   }
+
+   public int getParallelism() {
+   return parallelism;
+   }
+
+   public Path getJar() {
+   return jar;
+   }
+
+   /**
+* Builder for the {@link JobSubmission}.
+*/
+   public static class JobSubmissionStandInBuilder {
 
 Review comment:
   Why is this called `JobSubmissionStandIn*`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] Port Kafka E2E test to Java

2019-03-22 Thread GitBox
aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] 
Port Kafka E2E test to Java
URL: https://github.com/apache/flink/pull/7605#discussion_r268150850
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/JobController.java
 ##
 @@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink;
+
+/**
+ * Controller for interacting with a job.
+ */
+public interface JobController {
 
 Review comment:
   樂 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] Port Kafka E2E test to Java

2019-03-22 Thread GitBox
aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] 
Port Kafka E2E test to Java
URL: https://github.com/apache/flink/pull/7605#discussion_r268149545
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * General test utilities.
+ */
+public enum TestUtils {
+   ;
 
 Review comment:
   樂


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] Port Kafka E2E test to Java

2019-03-22 Thread GitBox
aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] 
Port Kafka E2E test to Java
URL: https://github.com/apache/flink/pull/7605#discussion_r268154828
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.kafka;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.tests.util.AutoClosableProcess;
+import org.apache.flink.tests.util.CommandLineWrapper;
+import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
+import org.apache.flink.tests.util.cache.DownloadCache;
+import org.apache.flink.util.OperatingSystem;
+
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking;
+import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking;
+
+/**
+ * {@link KafkaResource} that downloads kafka and sets up a local kafka 
cluster with the bundled zookeeper.
+ */
+public class LocalStandaloneKafkaResource implements KafkaResource {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(LocalStandaloneKafkaResource.class);
+   private static final Pattern ZK_DATA_DIR_PATTERN = 
Pattern.compile(".*(dataDir=).*");
+   private static final Pattern KAFKA_LOG_DIR_PATTERN = 
Pattern.compile(".*(log\\.dirs=).*");
+
+   private static final String ZOOKEEPER_HOST = "localhost";
+   private static final int ZOOKEEPER_PORT = 2181;
+   private static final String ZOOKEEPER_ADDRESS = ZOOKEEPER_HOST + ':' + 
ZOOKEEPER_PORT;
+   private static final String KAFKA_HOST = "localhost";
+   private static final int KAFKA_PORT = 9092;
+   private static final String KAFKA_ADDRESS = KAFKA_HOST + ':' + 
KAFKA_PORT;
+
+   private final TemporaryFolder tmp = new TemporaryFolder();
+
+   private final DownloadCache downloadCache = DownloadCache.get();
+   private final String kafkaVersion;
+   private Path kafkaDir;
+
+   LocalStandaloneKafkaResource(final String kafkaVersion) {
+   OperatingSystemRestriction.forbid(
+   String.format("The %s relies on UNIX utils and shell 
scripts.", getClass().getSimpleName()),
+   OperatingSystem.WINDOWS);
+   this.kafkaVersion = kafkaVersion;
+   }
+
+   private static String getKafkaDownloadUrl(final String kafkaVersion) {
+   return 
String.format("https://archive.apache.org/dist/kafka/%s/kafka_2.11-%s.tgz;, 
kafkaVersion, kafkaVersion);
+   }
+
+   @Override
+   public void before() throws Exception {
+   tmp.create();
+   downloadCache.before();
+
+   this.kafkaDir = 
tmp.newFolder("kafka").toPath().toAbsolutePath();
+   setupKafkaDist();
+   setKafkaCluster();
+   }
+
+   private void setupKafkaDist() throws IOException {
+   final Path downloadDirectory = 
tmp.newFolder("getOrDownload").toPath();
+   final Path kafkaArchive = 
downloadCache.getOrDownload(getKafkaDownloadUrl(kafkaVersion), 
downloadDirectory);
+
+   LOG.info("Kafka location: {}", kafkaDir.toAbsolutePath());
+   runBlocking(CommandLineWrapper
+   .tar(kafkaArchive)
+  

[GitHub] [flink] maqingxiang commented on a change in pull request #8008: [FLINK-11963][History Server]Add time-based cleanup mechanism in history server

2019-03-22 Thread GitBox
maqingxiang commented on a change in pull request #8008: [FLINK-11963][History 
Server]Add time-based cleanup mechanism in history server
URL: https://github.com/apache/flink/pull/8008#discussion_r268164156
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ##
 @@ -123,7 +125,9 @@ void stop() {
 
private static final String JSON_FILE_ENDING = ".json";
 
-   JobArchiveFetcherTask(List 
refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
+   JobArchiveFetcherTask(long retainedApplicationsMillis, 
List refreshDirs, File webDir,
+   CountDownLatch 
numFinishedPolls) {
+   this.retainedApplicationsMillis = 
retainedApplicationsMillis;
 
 Review comment:
   The test logic has been added by increasing the isCleanupEnabled variable. 
Thanks a lot for reviewing it again!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] Port Kafka E2E test to Java

2019-03-22 Thread GitBox
aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] 
Port Kafka E2E test to Java
URL: https://github.com/apache/flink/pull/7605#discussion_r268147030
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -68,6 +71,8 @@
 
private final List filesToDelete = new ArrayList<>(4);
 
+   private final Optional backupDir;
 
 Review comment:
   At first I was confused why we need backups of the distribution. Maybe this 
could be called `logUploadDir`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] Port Kafka E2E test to Java

2019-03-22 Thread GitBox
aljoscha commented on a change in pull request #7605: [FLINK-11463][PROTOTYPE] 
Port Kafka E2E test to Java
URL: https://github.com/apache/flink/pull/7605#discussion_r268149503
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FactoryUtils.java
 ##
 @@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.util;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Utilities for factories.
+ */
+public enum FactoryUtils {
+   ;
 
 Review comment:
   樂 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] maqingxiang commented on a change in pull request #8008: [FLINK-11963][History Server]Add time-based cleanup mechanism in history server

2019-03-22 Thread GitBox
maqingxiang commented on a change in pull request #8008: [FLINK-11963][History 
Server]Add time-based cleanup mechanism in history server
URL: https://github.com/apache/flink/pull/8008#discussion_r268163453
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ##
 @@ -153,9 +157,20 @@ public void run() {
continue;
}
boolean updateOverview = false;
+   long now = System.currentTimeMillis();
for (FileStatus jobArchive : 
jobArchives) {
Path jobArchivePath = 
jobArchive.getPath();
String jobID = 
jobArchivePath.getName();
+   if (retainedApplicationsMillis 
> 0L && now - jobArchive.getModificationTime() > retainedApplicationsMillis) {
+   if 
(LOG.isDebugEnabled()) {
+   
LOG.debug("delete the old archived job for path {}." + 
jobArchivePath.toString());
+   }
+   
jobArchivePath.getFileSystem().delete(jobArchivePath, false);
+   continue;
+   } else {
+   LOG.warn("Negative or 
zero values {} of the historyserver.archive.fs.retained-application-millis " +
 
 Review comment:
   I am very sorry that my carelessness caused this mistake, Thanks a lot for 
reviewing it again!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11153) Remove UdfAnalyzer

2019-03-22 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11153:
-
Summary: Remove UdfAnalyzer  (was: UdfAnalyzerTest fails with 
CodeAnalyzerException)

> Remove UdfAnalyzer
> --
>
> Key: FLINK-11153
> URL: https://issues.apache.org/jira/browse/FLINK-11153
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream, Tests
>Affects Versions: 1.8.0
>Reporter: Gary Yao
>Priority: Major
>
> {noformat}
> org.apache.flink.api.java.sca.CodeAnalyzerException: Exception occurred 
> during code analysis.
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzer.analyze(UdfAnalyzer.java:341)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInputWithKeys(UdfAnalyzerTest.java:1339)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.compareAnalyzerResultWithAnnotationsSingleInput(UdfAnalyzerTest.java:1322)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerTest.testForwardWithArrayModification(UdfAnalyzerTest.java:695)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:564)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.(Unknown 
> Source)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode(UdfAnalyzerUtils.java:131)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzerUtils.findMethodNode(UdfAnalyzerUtils.java:115)
>   at 
> org.apache.flink.api.java.sca.UdfAnalyzer.analyze(UdfAnalyzer.java:290)
>   ... 25 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] gustavo-momente edited a comment on issue #7980: [FLINK-11913] Shadding cassandra driver dependencies in cassandra conector

2019-03-22 Thread GitBox
gustavo-momente edited a comment on issue #7980: [FLINK-11913] Shadding 
cassandra driver dependencies in cassandra conector
URL: https://github.com/apache/flink/pull/7980#issuecomment-475612671
 
 
   I understand your point, but what would be the best possible way that we 
could support multiple Cassandra connector versions? As we currently use other 
libs that depends on different driver versions and that created problems on our 
end. Moreover, I didn't understand why both the driver and mapper are bundled 
with the flink-connector-cassandra, could we specify them as transitive 
dependencies?
   
   If you need I can provide a simple application that shows the kind of 
problem that we're experiencing when having multiple driver versions


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11988) Remove legacy MockNetworkEnvironment

2019-03-22 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-11988.
--
   Resolution: Fixed
Fix Version/s: 1.9.0

[~zjwang] thanks for the clean up. One remark, other might not agree but I 
would be perfectly fine of merging just fix as a {{hotfix}} (either as a part 
of larger PR or separate PR) without an overhead of creating JIRA ticket for it 
:) 

merged commit a83e896 into apache:master  

> Remove legacy MockNetworkEnvironment
> 
>
> Key: FLINK-11988
> URL: https://issues.apache.org/jira/browse/FLINK-11988
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Network, Tests
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Remove legacy {{MockNetworkEnvironment}} class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] gustavo-momente commented on issue #7980: [FLINK-11913] Shadding cassandra driver dependencies in cassandra conector

2019-03-22 Thread GitBox
gustavo-momente commented on issue #7980: [FLINK-11913] Shadding cassandra 
driver dependencies in cassandra conector
URL: https://github.com/apache/flink/pull/7980#issuecomment-475612671
 
 
   I understand your point, but what would be the best possible way that we 
could support multiple Cassandra connector versions? As we currently use other 
libs that depends on different connector versions and that created problems on 
our end. Moreover, I didn't understand why both the driver and mapper are 
bundled with the flink-connector-cassandra, could we specify them as transitive 
dependencies?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski merged pull request #8028: [FLINK-11988][network] Remove legacy MockNetworkEnvironment

2019-03-22 Thread GitBox
pnowojski merged pull request #8028: [FLINK-11988][network] Remove legacy 
MockNetworkEnvironment
URL: https://github.com/apache/flink/pull/8028
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11974) Introduce StreamOperatorSubstitutor to help table perform the whole Operator CodeGen

2019-03-22 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798987#comment-16798987
 ] 

Piotr Nowojski commented on FLINK-11974:


[~lzljs3620320] could elaborate a little bit more what's the problem and how 
are you trying to fix it?

As I understand it, the problem are the virtual calls for the operator 
instances? If that's the case I think I don't fully understand your proposed 
solution. 

I have never researched this, but as far as I recall we could just load each 
operator in separate class loader and let JIT take care of devirtualizing all 
of the classes, not only the operator. The idea here is that as long as there 
is only one or two implementations of given class present, JIT can devirtualize 
those calls. Now loading each instance in separate class loaders we might be 
able to leverage this optimisation. 

> Introduce StreamOperatorSubstitutor to help table perform the whole Operator 
> CodeGen
> 
>
> Key: FLINK-11974
> URL: https://issues.apache.org/jira/browse/FLINK-11974
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Operators
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce 
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's 
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a 
> StreamOperatorSubstitutor:
> {code:java}
> /**
>  * Basic interface for stream operator substitutes. It is transferred to the 
> streamTask by
>  * serialization, and produce an actual stream operator to the streamTask, 
> who uses the actual
>  * stream operator to run.
>  *
>  * @param  output type of the actual stream operator
>  */
> public interface StreamOperatorSubstitutor {
>/**
> * Produces the actual stream operator.
> *
> * @param userCodeClassLoader the user code class loader to use.
> * @return the actual stream operator created on {@code StreamTask}.
> */
>StreamOperator getActualStreamOperator(ClassLoader 
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
>return (T) ((StreamOperatorSubstitutor) 
> operator).getActualStreamOperator(cl);
> } else {
>return (T) operator;
> }
> {code}
> to get the real operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-22 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798970#comment-16798970
 ] 

Piotr Nowojski commented on FLINK-11654:


I don't think so, unless there is a stupid bug or we missed something crucial. 

[~jkreileder] are you sure that there is no error on your part somewhere, like 
you have submitted the job twice or that previous job's instance was still 
running before you submitted the next one? Or maybe something prevented 
previous instance of the Job/TaskManagers/Yarn containers to close up/clean up? 
Maybe there is a race condition between closing/shutting down remnants of the 
old job and start up of the new one? 

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] pnowojski commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-22 Thread GitBox
pnowojski commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r268145222
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -115,52 +115,62 @@ protected Throwable getFailureCause() {
 
public abstract boolean isReleased();
 
+   public abstract boolean isFinished();
+
+   public abstract boolean isFlushRequested();
 
 Review comment:
   The same question as above. Can it be non public or mark it unsafe?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7911: [FLINK-11082][network] Fix the logic of getting backlog in sub partition

2019-03-22 Thread GitBox
pnowojski commented on a change in pull request #7911: [FLINK-11082][network] 
Fix the logic of getting backlog in sub partition
URL: https://github.com/apache/flink/pull/7911#discussion_r268145093
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -115,52 +115,62 @@ protected Throwable getFailureCause() {
 
public abstract boolean isReleased();
 
+   public abstract boolean isFinished();
 
 Review comment:
   Does it have to be public? If yes, then I think it should be named 
`isFinishedUnsafe()`. More or less we are creating a convention here as we go, 
but I think for private/protected methods we can ignore unsafe suffixes as long 
as there exists only one version of such method. Here existence of thread-safe 
`isReleased()` adds to the confusion and suggests that `isFinished()` should be 
thread-safe as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11996) Case class maximum of 22 fields

2019-03-22 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11996:
-
Component/s: API / Type Serialization System
 API / Scala

> Case class maximum of 22 fields
> ---
>
> Key: FLINK-11996
> URL: https://issues.apache.org/jira/browse/FLINK-11996
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Scala, API / Type Serialization System, 
> Documentation
>Reporter: Wouter Zorgdrager
>Priority: Minor
>
> The [serialization 
> documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#flinks-typeinformation-class]
>  states that there is a limit of 22 fields in a case class. Since [Scala 
> 2.11|https://github.com/scala/bug/issues/7296] this arity limit has been 
> removed and therefore this limit should also be removed on this documentation 
> page. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11996) Case class maximum of 22 fields

2019-03-22 Thread Wouter Zorgdrager (JIRA)
Wouter Zorgdrager created FLINK-11996:
-

 Summary: Case class maximum of 22 fields
 Key: FLINK-11996
 URL: https://issues.apache.org/jira/browse/FLINK-11996
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Wouter Zorgdrager


The [serialization 
documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#flinks-typeinformation-class]
 states that there is a limit of 22 fields in a case class. Since [Scala 
2.11|https://github.com/scala/bug/issues/7296] this arity limit has been 
removed and therefore this limit should also be removed on this documentation 
page. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11431) Upgrade Akka to 2.5

2019-03-22 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11431:
-
Fix Version/s: 1.9.0
  Component/s: Runtime / Coordination
  Summary: Upgrade Akka to 2.5  (was: Akka dependency not compatible 
with java 9 or above)

> Upgrade Akka to 2.5
> ---
>
> Key: FLINK-11431
> URL: https://issues.apache.org/jira/browse/FLINK-11431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System, Runtime / Coordination
>Affects Versions: 1.7.1
>Reporter: Matthieu Bonneviot
>Assignee: Chesnay Schepler
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {noformat}
> 2019-01-24 14:43:52,059 ERROR akka.remote.Remoting
>   - class [B cannot be cast to class [C ([B and [C are in module 
> java.base of loader 'bootstrap')
> java.lang.ClassCastException: class [B cannot be cast to class [C ([B and [C 
> are in module java.base of loader 'bootstrap')
>     at akka.remote.artery.FastHash$.ofString(LruBoundedCache.scala:18)
>     at 
> akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:61)
>     at 
> akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:55)
>     at 
> akka.remote.artery.LruBoundedCache.getOrCompute(LruBoundedCache.scala:110)
>     at 
> akka.remote.RemoteActorRefProvider.resolveActorRef(RemoteActorRefProvider.scala:403)
>     at akka.actor.SerializedActorRef.readResolve(ActorRef.scala:433)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1250)
>     at 
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2096)
>     at 
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)Running
>  a jobmanager with java 11 fail with the following call stack:
> {noformat}
> Flink master is using akka 2.4.20.
> After some investigation, the error in akka comes from the following line:
> {code}
> def ofString(s: String): Int = {
> val chars = Unsafe.instance.getObject(s, 
> EnvelopeBuffer.StringValueFieldOffset).asInstanceOf[Array[Char]]
> {code}
> from java 9 it is now an array of byte. The akka code in the newer version is:
> {code}
> public static int fastHash(String str) {
>   ...
> if (isJavaVersion9Plus) {
> final byte[] chars = (byte[]) instance.getObject(str, 
> stringValueFieldOffset);
> ...
> } else {
> final char[] chars = (char[]) instance.getObject(str, 
> stringValueFieldOffset);
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-5429) Code generate types between operators in Table API

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-5429:
---

Assignee: vinoyang

> Code generate types between operators in Table API
> --
>
> Key: FLINK-5429
> URL: https://issues.apache.org/jira/browse/FLINK-5429
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Legacy Planner
>Reporter: Timo Walther
>Assignee: vinoyang
>Priority: Major
>
> Currently, the Table API uses the generic Row type for shipping records 
> between operators in underlying DataSet and DataStream API. For efficiency 
> reasons we should code generate those records. The final design is up for 
> discussion but here are some ideas:
> A row like {{(a: INT NULL, b: INT NOT NULL, c: STRING)}} could look like
> {code}
> final class GeneratedRow$123 {
>   public boolean a_isNull;
>   public int a;
>   public int b;
>   public String c;
> }
> {code}
> Types could be generated using Janino in the pre-flight phase. The generated 
> types should use primitive types wherever possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8036: [FLINK-11431][runtime] Upgrade akka to 2.5

2019-03-22 Thread GitBox
flinkbot commented on issue #8036: [FLINK-11431][runtime] Upgrade akka to 2.5
URL: https://github.com/apache/flink/pull/8036#issuecomment-475595316
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9607) Support ParquetTableSink

2019-03-22 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798942#comment-16798942
 ] 

vinoyang commented on FLINK-9607:
-

[~guibopan] What's the status about this issue?

> Support ParquetTableSink
> 
>
> Key: FLINK-9607
> URL: https://issues.apache.org/jira/browse/FLINK-9607
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: zhangminglei
>Assignee: Guibo Pan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7244) Add ParquetTableSource Implementation based on ParquetInputFormat

2019-03-22 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798941#comment-16798941
 ] 

vinoyang commented on FLINK-7244:
-

[~ZhenqiuHuang] What's the status about this issue?

> Add ParquetTableSource Implementation based on ParquetInputFormat
> -
>
> Key: FLINK-7244
> URL: https://issues.apache.org/jira/browse/FLINK-7244
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Ecosystem
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-3476) Support hash-based partial aggregate

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-3476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-3476:
---

Assignee: vinoyang

> Support hash-based partial aggregate
> 
>
> Key: FLINK-3476
> URL: https://issues.apache.org/jira/browse/FLINK-3476
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Legacy Planner
>Reporter: Chengxiang Li
>Assignee: vinoyang
>Priority: Major
>
> As described in the design doc, we should be able to enable hash-based 
> partial aggregate after hash-based combiner( #1517) is supported.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11431) Akka dependency not compatible with java 9 or above

2019-03-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11431:
---
Labels: pull-request-available  (was: )

> Akka dependency not compatible with java 9 or above
> ---
>
> Key: FLINK-11431
> URL: https://issues.apache.org/jira/browse/FLINK-11431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.7.1
>Reporter: Matthieu Bonneviot
>Assignee: Chesnay Schepler
>Priority: Minor
>  Labels: pull-request-available
>
> {noformat}
> 2019-01-24 14:43:52,059 ERROR akka.remote.Remoting
>   - class [B cannot be cast to class [C ([B and [C are in module 
> java.base of loader 'bootstrap')
> java.lang.ClassCastException: class [B cannot be cast to class [C ([B and [C 
> are in module java.base of loader 'bootstrap')
>     at akka.remote.artery.FastHash$.ofString(LruBoundedCache.scala:18)
>     at 
> akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:61)
>     at 
> akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:55)
>     at 
> akka.remote.artery.LruBoundedCache.getOrCompute(LruBoundedCache.scala:110)
>     at 
> akka.remote.RemoteActorRefProvider.resolveActorRef(RemoteActorRefProvider.scala:403)
>     at akka.actor.SerializedActorRef.readResolve(ActorRef.scala:433)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1250)
>     at 
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2096)
>     at 
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)Running
>  a jobmanager with java 11 fail with the following call stack:
> {noformat}
> Flink master is using akka 2.4.20.
> After some investigation, the error in akka comes from the following line:
> {code}
> def ofString(s: String): Int = {
> val chars = Unsafe.instance.getObject(s, 
> EnvelopeBuffer.StringValueFieldOffset).asInstanceOf[Array[Char]]
> {code}
> from java 9 it is now an array of byte. The akka code in the newer version is:
> {code}
> public static int fastHash(String str) {
>   ...
> if (isJavaVersion9Plus) {
> final byte[] chars = (byte[]) instance.getObject(str, 
> stringValueFieldOffset);
> ...
> } else {
> final char[] chars = (char[]) instance.getObject(str, 
> stringValueFieldOffset);
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol opened a new pull request #8036: [FLINK-11431][runtime] Upgrade akka to 2.5

2019-03-22 Thread GitBox
zentol opened a new pull request #8036: [FLINK-11431][runtime] Upgrade akka to 
2.5
URL: https://github.com/apache/flink/pull/8036
 
 
   ## What is the purpose of the change
   
   This PR upgrades akka to 2.5.21. This is a hard requirement for JDK 9+ 
support.
   
   
   ## Brief change log
   
   Removed APIs:
   * migrate from `ActorSystem#shutdown` to `ActorSystem#terminate`
   * migrate from `ActorSystem#isTerminated` to `ActorSystem#whenTerminated`
   
   Deprecated APIs:
   * migrate from `UntypedActor` to `AbstractActor`
   
   Misc:
   * pin `scala-parser-combinators` version to 1.1.1
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-3480) Add hash-based strategy for ReduceFunction

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-3480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-3480:
---

Assignee: vinoyang

> Add hash-based strategy for ReduceFunction
> --
>
> Key: FLINK-3480
> URL: https://issues.apache.org/jira/browse/FLINK-3480
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Operators
>Reporter: Fabian Hueske
>Assignee: vinoyang
>Priority: Major
>
> This issue is related to FLINK-3477. 
> While FLINK-3477 proposes to add hash-based combine strategy for 
> ReduceFunction, this issue aims to add a hash-based strategy for the final 
> aggregation.
> This will need again a special hash-table aggregation which allows for 
> in-place updates and append updates. However, it also needs to support 
> spilling to disk in case of too tight memory budgets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8035: [FLINK-11975][table-planner-blink] Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'

2019-03-22 Thread GitBox
flinkbot commented on issue #8035: [FLINK-11975][table-planner-blink] Support 
to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'
URL: https://github.com/apache/flink/pull/8035#issuecomment-475591092
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11975) Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'

2019-03-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11975:
---
Labels: pull-request-available  (was: )

> Support to run a sql query :  'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'
> ---
>
> Key: FLINK-11975
> URL: https://issues.apache.org/jira/browse/FLINK-11975
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / Planner
>Reporter: Jing Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)', 
> including:
> 1. add writeToSink, translateNodeDag in batch and stream tableEnv
> 2. introduce SinkRules for batch and stream
> 3. Introduce subclass of TableSink, including BaseUpsertStreamTableSink, 
> BatchTableSink, CollectTableSink, DataStreamTableSink
> 4. StreamExecSink/BatchExecSink implements ExecNode interface
> 5. StramExecValues/BatchExecValues  implements ExecNode interface, add 
> CodeGen for Values.
> 6. add Itcase test infrastructure, add Itcase to run SELECT * FROM (VALUES 
> (1, 2, 3)) T(a, b, c)' for batch and stream



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] beyond1920 opened a new pull request #8035: [FLINK-11975][table-planner-blink] Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'

2019-03-22 Thread GitBox
beyond1920 opened a new pull request #8035: [FLINK-11975][table-planner-blink] 
Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'
URL: https://github.com/apache/flink/pull/8035
 
 
   ## What is the purpose of the change
   
   Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'
   
   ## Brief change log
 - add writeToSink, translateNodeDag in batch and stream tableEnv
 - introduce SinkRules for batch and stream
 -  Introduce subclass of TableSink, including BaseUpsertStreamTableSink, 
BatchTableSink, CollectTableSink, DataStreamTableSink
 -  StreamExecSink/BatchExecSink implements ExecNode interface
 - StramExecValues/BatchExecValues implements ExecNode interface, add 
CodeGen for Values.
 - add Itcase test infrastructure, add Itcase to run SELECT * FROM (VALUES 
(1, 2, 3)) T(a, b, c)' for batch and stream
   
   
   ## Verifying this change
 ITCase
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9170) HCatolog integration with Table/SQL API

2019-03-22 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798937#comment-16798937
 ] 

vinoyang commented on FLINK-9170:
-

Hi [~hpeter] What's the status about this issue?

> HCatolog integration with Table/SQL API
> ---
>
> Key: FLINK-9170
> URL: https://issues.apache.org/jira/browse/FLINK-9170
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Ecosystem
>Reporter: Shuyi Chen
>Assignee: Zhenqiu Huang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8474) Add documentation for HBaseTableSource

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-8474:
---

Assignee: TANG Wen-hui

> Add documentation for HBaseTableSource
> --
>
> Key: FLINK-8474
> URL: https://issues.apache.org/jira/browse/FLINK-8474
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table SQL / Ecosystem
>Affects Versions: 1.3.0, 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: TANG Wen-hui
>Priority: Major
>
> The {{HBaseTableSource}} is not documented in the [Table Source and Sinks 
> documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9278) Allow restore savepoint with some SQL queries added/removed

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9278:
---

Assignee: vinoyang

> Allow restore savepoint with some SQL queries added/removed
> ---
>
> Key: FLINK-9278
> URL: https://issues.apache.org/jira/browse/FLINK-9278
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.4.2
>Reporter: Adrian Hains
>Assignee: vinoyang
>Priority: Major
>
> We are running a Flink job that contains multiple SQL queries. This is 
> configured by calling sqlQuery(String) one time for each SQL query, on a 
> single instance of StreamTableEnvironment. The queries are simple 
> aggregations with a tumble window.
> Currently I can configure my environment with queries Q1, Q2, and Q3, create 
> a savepoint, and restart the job from that savepoint if the same set of SQL 
> queries are used.
> If I remove some queries and add some others, Q2, Q4, and Q3, I am unable to 
> restart the job from the same savepoint. This behavior is expected, as the 
> documentation clearly describes that the operator IDs are generated if they 
> are not explicitly defined, and they cannot be explicitly defined when using 
> flink SQL.
> I would like to be able to specify a scoping operator id prefix when 
> registering a SQL query to a StreamTableEnvironment. This can then be used to 
> programmatically generate unique IDs for each of the operators created to 
> execute the SQL queries. For example, if I specify a prefix of "ID:Q2:" for 
> my Q2 query, and I restart the job with an identical SQL query for this 
> prefix, then I would be able to restore the state for this query even in the 
> presence of other queries being added or removed to the job graph.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9673) Improve State efficiency of bounded OVER window operators

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9673:
---

Assignee: vinoyang

> Improve State efficiency of bounded OVER window operators
> -
>
> Key: FLINK-9673
> URL: https://issues.apache.org/jira/browse/FLINK-9673
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Legacy Planner
>Reporter: Fabian Hueske
>Assignee: vinoyang
>Priority: Major
>
> Currently, the implementations of bounded OVER window aggregations store the 
> complete input for the bound interval. For example for the query:
> {code:java}
> SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY rowtime 
> RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime
> FROM 
> SELECT rowtime, user_id, action, val1, val2, val3, val4 FROM user
> {code}
> The whole records with schema {{(rowtime, user_id, action, val1, val2, val3, 
> val4)}} are stored for 14 days in order to retract them after 14 days from 
> the accumulators.
> However, it would be sufficient to only store those fields that are required 
> for the aggregtions, i.e., {{action}} in the example above. All other fields 
> could be set to {{null}} and hence significantly reduce the amount of data 
> that needs to be stored in state.
> This improvement can be applied to all four combinations of bounded 
> [rowtime|proctime] [range|rows] OVER windows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9605) Support KafkaProtoBufTableSink

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9605:
---

Assignee: vinoyang  (was: zhangminglei)

> Support KafkaProtoBufTableSink
> --
>
> Key: FLINK-9605
> URL: https://issues.apache.org/jira/browse/FLINK-9605
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka
>Reporter: zhangminglei
>Assignee: vinoyang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9651) Add a Kafka table source factory with Protobuf format support

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9651:
---

Assignee: vinoyang  (was: zhangminglei)

> Add a Kafka table source factory with Protobuf format support
> -
>
> Key: FLINK-9651
> URL: https://issues.apache.org/jira/browse/FLINK-9651
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Ecosystem
>Reporter: zhangminglei
>Assignee: vinoyang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9604) Support KafkaProtoBufTableSource

2019-03-22 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9604:
---

Assignee: vinoyang  (was: zhangminglei)

> Support KafkaProtoBufTableSource
> 
>
> Key: FLINK-9604
> URL: https://issues.apache.org/jira/browse/FLINK-9604
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Ecosystem
>Reporter: zhangminglei
>Assignee: vinoyang
>Priority: Major
>
> Protocol buffers are a language-neutral, platform-neutral extensible 
> mechanism for serializing structured data. And in actual production 
> applications, Protocol Buffers is often used for serialization and 
> deserialization. So, I would suggest add this commonly used function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11775) Introduce MemorySegmentWritable to let DataOutputView direct copy to internal bytes

2019-03-22 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798922#comment-16798922
 ] 

Piotr Nowojski commented on FLINK-11775:


Sorry for asking maybe stupid question, I'm not very familiar with this code. 
Shouldn't we in that case try to optimize the 
{{HybridMemorySegment#put(java.io.DataInput, int, int)}} for off heap cases. 
For example for cases when {{DataInput}} is backed by array or something that 
can be easily wrapped as ByteBuffer or something else that's efficient? Like:

{code:java}
@Override
public final void put(DataInput in, int offset, int length) throws 
IOException {
if (address <= addressLimit) {
if (heapMemory != null) {
in.readFully(heapMemory, offset, length);
}
else {
ByteBuffer src = in.wrapAsByteBuffer();
offHeapBuffer.put(src);
}
}
else {
throw new IllegalStateException("segment has been 
freed");
}
}
{code}

and provide some efficient implementation of  {{wrapAsByteBuffer()}} for 
{{DataInputView}} that are wrapping {{MemorySegment}}?

> Introduce MemorySegmentWritable to let DataOutputView direct copy to internal 
> bytes
> ---
>
> Key: FLINK-11775
> URL: https://issues.apache.org/jira/browse/FLINK-11775
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Operators
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>
> Blink new binary format is based on MemorySegment.
> Introduce MemorySegmentWritable to let DataOutputView direct copy to internal 
> bytes
> {code:java}
> /**
>  * Provides the interface for write(Segment).
>  */
> public interface MemorySegmentWritable {
>  /**
>  * Writes {@code len} bytes from memory segment {@code segment} starting at 
> offset {@code off}, in order,
>  * to the output.
>  *
>  * @param segment memory segment to copy the bytes from.
>  * @param off the start offset in the memory segment.
>  * @param len The number of bytes to copy.
>  * @throws IOException if an I/O error occurs.
>  */
>  void write(MemorySegment segment, int off, int len) throws IOException;
> }{code}
>  
> If we want to write a Memory Segment to DataOutputView, we need to copy bytes 
> to byte[] and then write it in, which is less effective.
> If we let AbstractPagedOutputView have a write(MemorySegment) interface, we 
> can copy it directly.
> We need to ensure this in network serialization, batch operator calculation 
> serialization, Streaming State serialization to avoid new byte[] and copy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11975) Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'

2019-03-22 Thread Jing Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Zhang updated FLINK-11975:
---
Description: 
Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)', 
including:
1. add writeToSink, translateNodeDag in batch and stream tableEnv
2. introduce SinkRules for batch and stream
3. Introduce subclass of TableSink, including BaseUpsertStreamTableSink, 
BatchTableSink, CollectTableSink, DataStreamTableSink
4. StreamExecSink/BatchExecSink implements ExecNode interface
5. StramExecValues/BatchExecValues  implements ExecNode interface, add CodeGen 
for Values.
6. add Itcase test infrastructure, add Itcase to run SELECT * FROM (VALUES (1, 
2, 3)) T(a, b, c)' for batch and stream

  was:
Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)', 
including:
1. add writeToSink, translateNodeDag in batch and stream tableEnv
2. introduce SinkRules for batch and stream
3. Introduce subclass of TableSink, including BaseUpsertStreamTableSink, 
BatchTableSink, CollectTableSink, DataStreamTableSink
4. StreamExecSink/BatchExecSink implements ExecNode interface
5. StramExecValues/BatchExecValues  implements ExecNode interface, add CodeGen 
for Values.
6. add Itcase test infrastructure



> Support to run a sql query :  'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'
> ---
>
> Key: FLINK-11975
> URL: https://issues.apache.org/jira/browse/FLINK-11975
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / Planner
>Reporter: Jing Zhang
>Priority: Major
>
> Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)', 
> including:
> 1. add writeToSink, translateNodeDag in batch and stream tableEnv
> 2. introduce SinkRules for batch and stream
> 3. Introduce subclass of TableSink, including BaseUpsertStreamTableSink, 
> BatchTableSink, CollectTableSink, DataStreamTableSink
> 4. StreamExecSink/BatchExecSink implements ExecNode interface
> 5. StramExecValues/BatchExecValues  implements ExecNode interface, add 
> CodeGen for Values.
> 6. add Itcase test infrastructure, add Itcase to run SELECT * FROM (VALUES 
> (1, 2, 3)) T(a, b, c)' for batch and stream



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #7882: [typo] Inaccurate info on Avro splitting support

2019-03-22 Thread GitBox
asfgit closed pull request #7882: [typo] Inaccurate info on Avro splitting 
support
URL: https://github.com/apache/flink/pull/7882
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11975) Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'

2019-03-22 Thread Jing Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Zhang updated FLINK-11975:
---
Description: 
Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)', 
including:
1. add writeToSink, translateNodeDag in batch and stream tableEnv
2. introduce SinkRules for batch and stream
3. Introduce subclass of TableSink, including BaseUpsertStreamTableSink, 
BatchTableSink, CollectTableSink, DataStreamTableSink
4. StreamExecSink/BatchExecSink implements ExecNode interface
5. StramExecValues/BatchExecValues  implements ExecNode interface, add CodeGen 
for Values.
6. add Itcase test infrastructure


  was:Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'


> Support to run a sql query :  'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'
> ---
>
> Key: FLINK-11975
> URL: https://issues.apache.org/jira/browse/FLINK-11975
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / Planner
>Reporter: Jing Zhang
>Priority: Major
>
> Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)', 
> including:
> 1. add writeToSink, translateNodeDag in batch and stream tableEnv
> 2. introduce SinkRules for batch and stream
> 3. Introduce subclass of TableSink, including BaseUpsertStreamTableSink, 
> BatchTableSink, CollectTableSink, DataStreamTableSink
> 4. StreamExecSink/BatchExecSink implements ExecNode interface
> 5. StramExecValues/BatchExecValues  implements ExecNode interface, add 
> CodeGen for Values.
> 6. add Itcase test infrastructure



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot edited a comment on issue #7882: [typo] Inaccurate info on Avro splitting support

2019-03-22 Thread GitBox
flinkbot edited a comment on issue #7882: [typo] Inaccurate info on Avro 
splitting support
URL: https://github.com/apache/flink/pull/7882#issuecomment-468973315
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @rmetzger [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @rmetzger [PMC]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @rmetzger [PMC]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @rmetzger [PMC]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >