[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275369#comment-16275369
 ] 

ASF GitHub Bot commented on FLINK-6094:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4471
  
hi @twalthr , Sorry for the late reply. The notification of github has been 
ignored mistakenly. I will give an update ASAP. Thanks very much. 


> Implement stream-stream proctime non-window  inner join
> ---
>
> Key: FLINK-6094
> URL: https://issues.apache.org/jira/browse/FLINK-6094
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4471: [FLINK-6094] [table] Implement stream-stream proctime non...

2017-12-01 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4471
  
hi @twalthr , Sorry for the late reply. The notification of github has been 
ignored mistakenly. I will give an update ASAP. Thanks very much. 


---


[jira] [Commented] (FLINK-8141) Add AppendStreamTableSink for bucketed ORC files

2017-12-01 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275158#comment-16275158
 ] 

Till Rohrmann commented on FLINK-8141:
--

Sorry, I mixed up the JIRA numbers. The merged PR fixed FLINK-8184.

> Add AppendStreamTableSink for bucketed ORC files
> 
>
> Key: FLINK-8141
> URL: https://issues.apache.org/jira/browse/FLINK-8141
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Priority: Minor
>
> It would be good to have an {{AppendStreamTableSink}} that writes to bucketed 
> ORC files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8141) Add AppendStreamTableSink for bucketed ORC files

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275155#comment-16275155
 ] 

ASF GitHub Bot commented on FLINK-8141:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5109


> Add AppendStreamTableSink for bucketed ORC files
> 
>
> Key: FLINK-8141
> URL: https://issues.apache.org/jira/browse/FLINK-8141
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Priority: Minor
>
> It would be good to have an {{AppendStreamTableSink}} that writes to bucketed 
> ORC files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8184) Return raw JsonPlan instead of escaped string value in JobDetailsInfo

2017-12-01 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8184.

Resolution: Fixed

Fixed via ee9027e491ff50bf72f51c869a5095aef7092396

> Return raw JsonPlan instead of escaped string value in JobDetailsInfo
> -
>
> Key: FLINK-8184
> URL: https://issues.apache.org/jira/browse/FLINK-8184
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{JobDetailsInfo}} should pass the JsonPlan as a raw value because 
> otherwise the string value will be escaped.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5109: [FLINK-8141] [flip6] Fix JsonPlan serialization in...

2017-12-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5109


---


[jira] [Commented] (FLINK-8141) Add AppendStreamTableSink for bucketed ORC files

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275149#comment-16275149
 ] 

ASF GitHub Bot commented on FLINK-8141:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5109
  
Merging this PR.


> Add AppendStreamTableSink for bucketed ORC files
> 
>
> Key: FLINK-8141
> URL: https://issues.apache.org/jira/browse/FLINK-8141
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Priority: Minor
>
> It would be good to have an {{AppendStreamTableSink}} that writes to bucketed 
> ORC files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5109: [FLINK-8141] [flip6] Fix JsonPlan serialization in JobDet...

2017-12-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5109
  
Merging this PR.


---


[jira] [Commented] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.

2017-12-01 Thread Tao Xia (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275136#comment-16275136
 ] 

Tao Xia commented on FLINK-8173:


thanks for the fix. it is working now with 1.4

> InvalidProgramException: Table program cannot be compiled. This is a bug. 
> Please file an issue.
> ---
>
> Key: FLINK-8173
> URL: https://issues.apache.org/jira/browse/FLINK-8173
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Tao Xia
>Assignee: Timo Walther
> Fix For: 1.4.0, 1.5.0
>
>
> It is a stream of Avro objects, simply select a String field and trying to 
> print out
> val query = "SELECT nd_key FROM table1"
> val result = tableEnv.sql(query)
> tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print()
> 11/29/2017 16:07:36   Source: Custom Source -> from: (accepted_cohort_id, 
> admin_id, after_submission, amount_paid, anonymous_id, application_id, 
> atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, 
> concept_key, concept_rank, context, context_campaign, context_experiment, 
> coupon_code, course_key, course_rank, cta_destination, cta_location, 
> cta_message, cta_type, currency, decision_group_id, device_browser, 
> device_os, device_os_version, device_type, duration, evaluation_id, 
> event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, 
> lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, 
> module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, 
> notification_id, num_concepts_completed, num_interactions, 
> num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, 
> pause_reason, payment_plan, payment_provider, points_earned, points_possible, 
> price, price_sheet, product_key, product_type, provider_charge_id, 
> provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, 
> results, scholarship_group_key, search_term, skill_level, subscription_id, 
> suspension_length, suspension_reason, technology, timestamp, total_concepts, 
> total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, 
> user_response, variant, version, workspace_id, workspace_session, 
> workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) 
> switched to FAILED 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column 
> 15: Assignment conversion not possible from type "java.lang.CharSequence" to 
> type "org.apache.avro.util.Utf8"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>   at 
> org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534)
>   at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436)
>   a

[jira] [Commented] (FLINK-8030) Start JobMasterRestEndpoint in JobClusterEntrypoint

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275117#comment-16275117
 ] 

ASF GitHub Bot commented on FLINK-8030:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4988
  
Actually we cannot change it to `0.0.0.0`, because the address is also used 
by the client to connect to the rest server. Therefore, it must be a valid 
target address.


> Start JobMasterRestEndpoint in JobClusterEntrypoint
> ---
>
> Key: FLINK-8030
> URL: https://issues.apache.org/jira/browse/FLINK-8030
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should launch the {{JobMasterRestEndpoint}} in the 
> {{JobClusterEntrypoint}} in order to run the web frontend for per-job 
> clusters.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4988: [FLINK-8030] Instantiate JobMasterRestEndpoint in JobClus...

2017-12-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4988
  
Actually we cannot change it to `0.0.0.0`, because the address is also used 
by the client to connect to the rest server. Therefore, it must be a valid 
target address.


---


[jira] [Commented] (FLINK-8030) Start JobMasterRestEndpoint in JobClusterEntrypoint

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275113#comment-16275113
 ] 

ASF GitHub Bot commented on FLINK-8030:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4988
  
Thanks for the review @shuai-xu. I'll change the default value of the rest 
address to `0.0.0.0`.


> Start JobMasterRestEndpoint in JobClusterEntrypoint
> ---
>
> Key: FLINK-8030
> URL: https://issues.apache.org/jira/browse/FLINK-8030
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should launch the {{JobMasterRestEndpoint}} in the 
> {{JobClusterEntrypoint}} in order to run the web frontend for per-job 
> clusters.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4988: [FLINK-8030] Instantiate JobMasterRestEndpoint in JobClus...

2017-12-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4988
  
Thanks for the review @shuai-xu. I'll change the default value of the rest 
address to `0.0.0.0`.


---


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274913#comment-16274913
 ] 

Sebastian Klemke commented on FLINK-8186:
-

[~twalthr] Thanks for looking into it. To summarize: Provided test program and 
pom fail with above mentioned exception if all of the following conditions are 
true:

- test program is built using "build-jar" profile
- Flink 1.4.0 RC2 hadoop27 runtime is used
- standalone cluster
- input dataset is non-empty

In this case, rhs of 
https://github.com/apache/flink/blob/release-1.4/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java#L119
 is loaded by 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader,
 comparison fails and else branch is evaluated, leading to ReflectDatumReader 
being used instead of GenericDatumReader.

> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8189) move flink-statebackend-rocksdb out of flink-contrib

2017-12-01 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8189:
---

 Summary: move flink-statebackend-rocksdb out of flink-contrib
 Key: FLINK-8189
 URL: https://issues.apache.org/jira/browse/FLINK-8189
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Bowen Li
Assignee: Bowen Li


Move {{flink-statebackend-rocksdb}} into probably its own/state-backend module 
or {{flink-runtime}} package.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala

2017-12-01 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8175:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-8188

> remove flink-streaming-contrib and migrate its classes to 
> flink-streaming-java/scala
> 
>
> Key: FLINK-8175
> URL: https://issues.apache.org/jira/browse/FLINK-8175
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>
> I propose removing flink-streaming-contrib from flink-contrib, and migrating 
> its classes to flink-streaming-java/scala for the following reasons:
> - flink-streaming-contrib is so small that it only has 4 classes (3 java and 
> 1 scala), and they don't need a dedicated jar for Flink to distribute and 
> maintain it and for users to deal with the overhead of dependency management
> - the 4 classes in flink-streaming-contrib are logically more tied to 
> flink-streaming-java/scala, and thus can be easily migrated
> - flink-contrib is already too crowded and noisy. It contains lots of sub 
> modules with different purposes which confuse developers and users, and they 
> lack a proper project hierarchy
> To take a step even forward, I would argue that even flink-contrib should be 
> removed and all its submodules should be migrated to other top-level modules 
> for the following reasons: 1) Apache Flink the whole project itself is a 
> result of contributions from many developers, there's no reason to highlight 
> some contributions in a dedicated module named 'contrib' 2) flink-contrib 
> inherently doesn't have a good hierarchy to hold submodules



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala

2017-12-01 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-8175:
---

Assignee: Bowen Li

> remove flink-streaming-contrib and migrate its classes to 
> flink-streaming-java/scala
> 
>
> Key: FLINK-8175
> URL: https://issues.apache.org/jira/browse/FLINK-8175
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>
> I propose removing flink-streaming-contrib from flink-contrib, and migrating 
> its classes to flink-streaming-java/scala for the following reasons:
> - flink-streaming-contrib is so small that it only has 4 classes (3 java and 
> 1 scala), and they don't need a dedicated jar for Flink to distribute and 
> maintain it and for users to deal with the overhead of dependency management
> - the 4 classes in flink-streaming-contrib are logically more tied to 
> flink-streaming-java/scala, and thus can be easily migrated
> - flink-contrib is already too crowded and noisy. It contains lots of sub 
> modules with different purposes which confuse developers and users, and they 
> lack a proper project hierarchy
> To take a step even forward, I would argue that even flink-contrib should be 
> removed and all its submodules should be migrated to other top-level modules 
> for the following reasons: 1) Apache Flink the whole project itself is a 
> result of contributions from many developers, there's no reason to highlight 
> some contributions in a dedicated module named 'contrib' 2) flink-contrib 
> inherently doesn't have a good hierarchy to hold submodules



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8188) Clean up flink-contrib

2017-12-01 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8188:
---

 Summary: Clean up flink-contrib
 Key: FLINK-8188
 URL: https://issues.apache.org/jira/browse/FLINK-8188
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.5.0
Reporter: Bowen Li


This is the umbrella ticket for cleaning up flink-contrib. 

We argue that flink-contrib should be removed and all its submodules should be 
migrated to other top-level modules for the following reasons: 

1) Apache Flink the whole project itself is a result of contributions from many 
developers, there's no reason to highlight some contributions in a dedicated 
module named 'contrib'
2) flink-contrib is already too crowded and noisy. It contains lots of sub 
modules with different purposes which confuse developers and users, and they 
lack a proper project hierarchy
3) This will save us quite some build time

More details in discussions at FLINK-8175 and FLINK-8167



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7294) mesos.resourcemanager.framework.role not working

2017-12-01 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274705#comment-16274705
 ] 

Eron Wright  commented on FLINK-7294:
-

[~bbayani] I too thought it would be fine but upon further investigation found 
that `Resource::setRole` actually means, 'draw the resources from a reservation 
associated with role such-and-such'.  In fact, `setRole` is deprecated in favor 
of a more elaborate reservations structure.

We're now looking for a fix that will cover all scenarios.  Feel free to email 
me directly to discuss this further.

> mesos.resourcemanager.framework.role not working
> 
>
> Key: FLINK-7294
> URL: https://issues.apache.org/jira/browse/FLINK-7294
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.3.1
>Reporter: Bhumika Bayani
>Assignee: Eron Wright 
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> I am using the above said setting in flink-conf.yaml
> e.g.
> mesos.resourcemanager.framework.role: mesos_role_tasks
> I see a flink-scheduler registered in mesos/frameworks tab with above said 
> role.
> But the scheduler fails to launch any tasks inspite of getting 
> resource-offers from mesos-agents with correct role.
> The error seen is:
> {code}
> 2017-07-28 13:23:00,683 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
> Mesos task taskmanager-03768 failed, with a TaskManager in launch or 
> registration. State: TASK_ERROR Reason: REASON_TASK_INVALID (Task uses more 
> resources cpus(\*):1; mem(\*):1024; ports(\*):[4006-4007] than available 
> cpus(mesos_role_tasks):7.4; mem(mesos_role_tasks):45876; 
> ports(mesos_role_tasks):[4002-4129, 4131-4380, 4382-4809, 4811-4957, 
> 4959-4966, 4968-4979, 4981-5049, 31000-31196, 31198-31431, 31433-31607, 
> 31609-32000]; ephemeral_storage(mesos_role_tasks):37662; 
> efs_storage(mesos_role_tasks):8.79609e+12; disk(mesos_role_tasks):5115)
> {code}
> The request is made for resources with * role. We do not have mesos running 
> anywhere with * role. Thus task manager never come up. 
> Am I missing any configuration?
> I am using flink version 1.3.1



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274656#comment-16274656
 ] 

Sebastian Klemke commented on FLINK-8186:
-

With -Pbuild-jar, the job fat-jar includes avro, without it doesn't. I added 
org.apache.flink:flink-avro:${flink.version} to "provided" dependency list: Now 
it works in standalone cluster and the jar only contains GenericRecordCount 
class. Have to re-test on YARN and LocalExecutionEnvironment.

> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-01 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274641#comment-16274641
 ] 

Timo Walther commented on FLINK-8186:
-

Thanks, I will look into it again on Monday.

> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.

2017-12-01 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8173:

Component/s: Table API & SQL

> InvalidProgramException: Table program cannot be compiled. This is a bug. 
> Please file an issue.
> ---
>
> Key: FLINK-8173
> URL: https://issues.apache.org/jira/browse/FLINK-8173
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Tao Xia
>Assignee: Timo Walther
> Fix For: 1.4.0, 1.5.0
>
>
> It is a stream of Avro objects, simply select a String field and trying to 
> print out
> val query = "SELECT nd_key FROM table1"
> val result = tableEnv.sql(query)
> tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print()
> 11/29/2017 16:07:36   Source: Custom Source -> from: (accepted_cohort_id, 
> admin_id, after_submission, amount_paid, anonymous_id, application_id, 
> atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, 
> concept_key, concept_rank, context, context_campaign, context_experiment, 
> coupon_code, course_key, course_rank, cta_destination, cta_location, 
> cta_message, cta_type, currency, decision_group_id, device_browser, 
> device_os, device_os_version, device_type, duration, evaluation_id, 
> event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, 
> lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, 
> module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, 
> notification_id, num_concepts_completed, num_interactions, 
> num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, 
> pause_reason, payment_plan, payment_provider, points_earned, points_possible, 
> price, price_sheet, product_key, product_type, provider_charge_id, 
> provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, 
> results, scholarship_group_key, search_term, skill_level, subscription_id, 
> suspension_length, suspension_reason, technology, timestamp, total_concepts, 
> total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, 
> user_response, variant, version, workspace_id, workspace_session, 
> workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) 
> switched to FAILED 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column 
> 15: Assignment conversion not possible from type "java.lang.CharSequence" to 
> type "org.apache.avro.util.Utf8"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>   at 
> org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534)
>   at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436)
>   at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.jav

[jira] [Resolved] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.

2017-12-01 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8173.
-
   Resolution: Fixed
Fix Version/s: 1.5.0
   1.4.0

Fixed in 1.5: 6aced1251878a4a98f66732d14336f3c86ec9d98
Fixed in 1.4: 6ca2be3ffbf3f8a92c21b0990e317ff3a60a3e82

> InvalidProgramException: Table program cannot be compiled. This is a bug. 
> Please file an issue.
> ---
>
> Key: FLINK-8173
> URL: https://issues.apache.org/jira/browse/FLINK-8173
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Tao Xia
>Assignee: Timo Walther
> Fix For: 1.4.0, 1.5.0
>
>
> It is a stream of Avro objects, simply select a String field and trying to 
> print out
> val query = "SELECT nd_key FROM table1"
> val result = tableEnv.sql(query)
> tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print()
> 11/29/2017 16:07:36   Source: Custom Source -> from: (accepted_cohort_id, 
> admin_id, after_submission, amount_paid, anonymous_id, application_id, 
> atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, 
> concept_key, concept_rank, context, context_campaign, context_experiment, 
> coupon_code, course_key, course_rank, cta_destination, cta_location, 
> cta_message, cta_type, currency, decision_group_id, device_browser, 
> device_os, device_os_version, device_type, duration, evaluation_id, 
> event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, 
> lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, 
> module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, 
> notification_id, num_concepts_completed, num_interactions, 
> num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, 
> pause_reason, payment_plan, payment_provider, points_earned, points_possible, 
> price, price_sheet, product_key, product_type, provider_charge_id, 
> provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, 
> results, scholarship_group_key, search_term, skill_level, subscription_id, 
> suspension_length, suspension_reason, technology, timestamp, total_concepts, 
> total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, 
> user_response, variant, version, workspace_id, workspace_session, 
> workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) 
> switched to FAILED 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column 
> 15: Assignment conversion not possible from type "java.lang.CharSequence" to 
> type "org.apache.avro.util.Utf8"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>   at 
> org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534)
>   at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.ja

[jira] [Updated] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.

2017-12-01 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8173:

Affects Version/s: 1.4.0

> InvalidProgramException: Table program cannot be compiled. This is a bug. 
> Please file an issue.
> ---
>
> Key: FLINK-8173
> URL: https://issues.apache.org/jira/browse/FLINK-8173
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Tao Xia
>Assignee: Timo Walther
> Fix For: 1.4.0, 1.5.0
>
>
> It is a stream of Avro objects, simply select a String field and trying to 
> print out
> val query = "SELECT nd_key FROM table1"
> val result = tableEnv.sql(query)
> tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print()
> 11/29/2017 16:07:36   Source: Custom Source -> from: (accepted_cohort_id, 
> admin_id, after_submission, amount_paid, anonymous_id, application_id, 
> atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, 
> concept_key, concept_rank, context, context_campaign, context_experiment, 
> coupon_code, course_key, course_rank, cta_destination, cta_location, 
> cta_message, cta_type, currency, decision_group_id, device_browser, 
> device_os, device_os_version, device_type, duration, evaluation_id, 
> event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, 
> lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, 
> module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, 
> notification_id, num_concepts_completed, num_interactions, 
> num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, 
> pause_reason, payment_plan, payment_provider, points_earned, points_possible, 
> price, price_sheet, product_key, product_type, provider_charge_id, 
> provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, 
> results, scholarship_group_key, search_term, skill_level, subscription_id, 
> suspension_length, suspension_reason, technology, timestamp, total_concepts, 
> total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, 
> user_response, variant, version, workspace_id, workspace_session, 
> workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) 
> switched to FAILED 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column 
> 15: Assignment conversion not possible from type "java.lang.CharSequence" to 
> type "org.apache.avro.util.Utf8"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>   at 
> org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534)
>   at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436)
>   at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:21

[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274628#comment-16274628
 ] 

Sebastian Klemke commented on FLINK-8186:
-

The error only occurs if job jar was built with -Pbuild-jar. It also occurs on 
testdata.avro from Flink repo.

> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274613#comment-16274613
 ] 

ASF GitHub Bot commented on FLINK-8173:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5111


> InvalidProgramException: Table program cannot be compiled. This is a bug. 
> Please file an issue.
> ---
>
> Key: FLINK-8173
> URL: https://issues.apache.org/jira/browse/FLINK-8173
> Project: Flink
>  Issue Type: Bug
>Reporter: Tao Xia
>Assignee: Timo Walther
>
> It is a stream of Avro objects, simply select a String field and trying to 
> print out
> val query = "SELECT nd_key FROM table1"
> val result = tableEnv.sql(query)
> tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print()
> 11/29/2017 16:07:36   Source: Custom Source -> from: (accepted_cohort_id, 
> admin_id, after_submission, amount_paid, anonymous_id, application_id, 
> atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, 
> concept_key, concept_rank, context, context_campaign, context_experiment, 
> coupon_code, course_key, course_rank, cta_destination, cta_location, 
> cta_message, cta_type, currency, decision_group_id, device_browser, 
> device_os, device_os_version, device_type, duration, evaluation_id, 
> event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, 
> lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, 
> module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, 
> notification_id, num_concepts_completed, num_interactions, 
> num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, 
> pause_reason, payment_plan, payment_provider, points_earned, points_possible, 
> price, price_sheet, product_key, product_type, provider_charge_id, 
> provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, 
> results, scholarship_group_key, search_term, skill_level, subscription_id, 
> suspension_length, suspension_reason, technology, timestamp, total_concepts, 
> total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, 
> user_response, variant, version, workspace_id, workspace_session, 
> workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) 
> switched to FAILED 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column 
> 15: Assignment conversion not possible from type "java.lang.CharSequence" to 
> type "org.apache.avro.util.Utf8"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>   at 
> org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534)
>   at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436)
>   at org.codehaus.janino.UnitCompiler.access$40

[GitHub] flink pull request #5111: [FLINK-8173] [table] Fix input unboxing and suppor...

2017-12-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5111


---


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274609#comment-16274609
 ] 

Sebastian Klemke commented on FLINK-8186:
-

Regarding binaries: Yes, this is the runtime I'm using. I'm compiling against 
1.4 snapshot via maven. And for the avro files: I'll test with 
https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/resources/testdata.avro

> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8151) [Table] Map equality check to use entrySet equality

2017-12-01 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8151.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed in 1.5: deea4b32bab4160e2279e9f57d7272df2d91e434

> [Table] Map equality check to use entrySet equality
> ---
>
> Key: FLINK-8151
> URL: https://issues.apache.org/jira/browse/FLINK-8151
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
> Fix For: 1.5.0
>
>
> Following up with FLINK-8038. The equality check currently is broken. Plan to 
> support element-wise equality check by always using the base class: 
> "java.util.Map.equals" method.
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274601#comment-16274601
 ] 

Sebastian Klemke commented on FLINK-8186:
-

What puzzles me is 
org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) in the 
stack trace. It should be GenericData.newRecord().

> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-12-01 Thread Christos Hadjinikolis (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274596#comment-16274596
 ] 

Christos Hadjinikolis commented on FLINK-5506:
--

Just to note that I have tried running the algorithm with different graph 
types, e.g. using Graph but I keep hitting against the same 
error message. 

Looking into the source code I can see that necessary precautions are taken to 
guarantee that:

{{labelsWithHighestScore.get(maxScoreLabel);​}}

won't be null. So, this possibly suggests that it's an issue concerned with 
asychronous calls to the thread, i.e. the getter is somehow called prior 
initialisation of {{labelsWithHighestScoret}}.

Hope this helps,

Christos


> Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
> -
>
> Key: FLINK-5506
> URL: https://issues.apache.org/jira/browse/FLINK-5506
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.4, 1.3.2, 1.4.1
>Reporter: Miguel E. Coimbra
>Assignee: Greg Hogan
>  Labels: easyfix, newbie
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Reporting this here as per Vasia's advice.
> I am having the following problem while trying out the 
> org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API 
> (Java).
> Specs: JDK 1.8.0_102 x64
> Apache Flink: 1.1.4
> Suppose I have a very small (I tried an example with 38 vertices as well) 
> dataset stored in a tab-separated file 3-vertex.tsv:
> {code}
> #id1 id2 score
> 010
> 020
> 030
> {code}
> This is just a central vertex with 3 neighbors (disconnected between 
> themselves).
> I am loading the dataset and executing the algorithm with the following code:
> {code}
> // Load the data from the .tsv file.
> final DataSet> edgeTuples = 
> env.readCsvFile(inputPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#")  // comments start with "%"
> .types(Long.class, Long.class, Double.class);
> // Generate a graph and add reverse edges (undirected).
> final Graph graph = Graph.fromTupleDataSet(edgeTuples, 
> new MapFunction() {
> private static final long serialVersionUID = 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env).getUndirected();
> // CommunityDetection parameters.
> final double hopAttenuationDelta = 0.5d;
> final int iterationCount = 10;
> // Prepare and trigger the execution.
> DataSet> vs = graph.run(new 
> org.apache.flink.graph.library.CommunityDetection(iterationCount, 
> hopAttenuationDelta)).getVertices();
> vs.print();
> {code}
> ​Running this code throws the following exception​ (check the bold line):
> {code}
> ​org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)
> at 
> org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at 
> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at 
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> at 
> org.apache.flink.runtime.iterative.task.Itera

[jira] [Commented] (FLINK-8151) [Table] Map equality check to use entrySet equality

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274595#comment-16274595
 ] 

ASF GitHub Bot commented on FLINK-8151:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5070


> [Table] Map equality check to use entrySet equality
> ---
>
> Key: FLINK-8151
> URL: https://issues.apache.org/jira/browse/FLINK-8151
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Following up with FLINK-8038. The equality check currently is broken. Plan to 
> support element-wise equality check by always using the base class: 
> "java.util.Map.equals" method.
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5070: [FLINK-8151][table]Map equality check to use entry...

2017-12-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5070


---


[jira] [Commented] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274591#comment-16274591
 ] 

ASF GitHub Bot commented on FLINK-8173:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5111
  
Thanks for the fix. +1 to merge


> InvalidProgramException: Table program cannot be compiled. This is a bug. 
> Please file an issue.
> ---
>
> Key: FLINK-8173
> URL: https://issues.apache.org/jira/browse/FLINK-8173
> Project: Flink
>  Issue Type: Bug
>Reporter: Tao Xia
>Assignee: Timo Walther
>
> It is a stream of Avro objects, simply select a String field and trying to 
> print out
> val query = "SELECT nd_key FROM table1"
> val result = tableEnv.sql(query)
> tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print()
> 11/29/2017 16:07:36   Source: Custom Source -> from: (accepted_cohort_id, 
> admin_id, after_submission, amount_paid, anonymous_id, application_id, 
> atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, 
> concept_key, concept_rank, context, context_campaign, context_experiment, 
> coupon_code, course_key, course_rank, cta_destination, cta_location, 
> cta_message, cta_type, currency, decision_group_id, device_browser, 
> device_os, device_os_version, device_type, duration, evaluation_id, 
> event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, 
> lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, 
> module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, 
> notification_id, num_concepts_completed, num_interactions, 
> num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, 
> pause_reason, payment_plan, payment_provider, points_earned, points_possible, 
> price, price_sheet, product_key, product_type, provider_charge_id, 
> provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, 
> results, scholarship_group_key, search_term, skill_level, subscription_id, 
> suspension_length, suspension_reason, technology, timestamp, total_concepts, 
> total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, 
> user_response, variant, version, workspace_id, workspace_session, 
> workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) 
> switched to FAILED 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column 
> 15: Assignment conversion not possible from type "java.lang.CharSequence" to 
> type "org.apache.avro.util.Utf8"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>   at 
> org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534)
>   at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443)
>   at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436)
>   at org.co

[GitHub] flink issue #5111: [FLINK-8173] [table] Fix input unboxing and support Avro ...

2017-12-01 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5111
  
Thanks for the fix. +1 to merge


---


[jira] [Commented] (FLINK-8151) [Table] Map equality check to use entrySet equality

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274580#comment-16274580
 ] 

ASF GitHub Bot commented on FLINK-8151:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5070
  
@walterddr I think the current solution is sufficient for now. I will merge 
this...


> [Table] Map equality check to use entrySet equality
> ---
>
> Key: FLINK-8151
> URL: https://issues.apache.org/jira/browse/FLINK-8151
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Following up with FLINK-8038. The equality check currently is broken. Plan to 
> support element-wise equality check by always using the base class: 
> "java.util.Map.equals" method.
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5070: [FLINK-8151][table]Map equality check to use entrySet equ...

2017-12-01 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5070
  
@walterddr I think the current solution is sufficient for now. I will merge 
this...


---


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-01 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274567#comment-16274567
 ] 

Timo Walther commented on FLINK-8186:
-

Is it possible that the error is only reproducible with your Avro files?

> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-01 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274565#comment-16274565
 ] 

Timo Walther commented on FLINK-8186:
-

Actually, the error is also very weird because {{GenericRecord}} is an 
interface with no constructor and actually this record is therefore serialized 
with Kryo.

> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-01 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274561#comment-16274561
 ] 

Timo Walther commented on FLINK-8186:
-

I'm sorry but I cannot reproduce your error. Are you using these binaries? 
http://people.apache.org/~aljoscha/flink-1.4.0-rc2/flink-1.4.0-bin-hadoop27-scala_2.11.tgz


> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastian Klemke updated FLINK-8186:

Summary: AvroInputFormat regression: fails to deserialize GenericRecords on 
standalone cluster with hadoop27 compat  (was: AvroInputFormat regression: 
fails to deserialize GenericRecords on cluster with hadoop27 compat)

> AvroInputFormat regression: fails to deserialize GenericRecords on standalone 
> cluster with hadoop27 compat
> --
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274516#comment-16274516
 ] 

ASF GitHub Bot commented on FLINK-7574:
---

Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/5076
  
@zentol , Thanks for your suggestions, i will be very careful to clean up 
the dependencies.
I will start with the top modules (not used by other modules), order as 
follow:
> ├── flink-clients
> ├── flink-scala-shell
> ├── flink-end-to-end-tests
> ├── flink-fs-tests
> ├── flink-mesos
> ├── flink-yarn-tests
> ├── flink-yarn
> ├── flink-formats
> │   ├── flink-avro
> ├── flink-java8
> ├── flink-runtime-web
> ├── flink-examples
> │   ├── flink-examples-batch
> │   ├── flink-examples-streaming
> │   ├── flink-examples-table
> ├── flink-libraries
> │   ├── flink-cep
> │   ├── flink-cep-scala
> │   ├── flink-gelly
> │   ├── flink-gelly-examples
> │   ├── flink-gelly-scala
> │   ├── flink-ml
> │   ├── flink-python
> │   ├── flink-table
> ├── flink-metrics
> │   ├── flink-metrics-core
> │   ├── flink-metrics-datadog
> │   ├── flink-metrics-dropwizard
> │   ├── flink-metrics-ganglia
> │   ├── flink-metrics-graphite
> │   ├── flink-metrics-jmx
> │   ├── flink-metrics-prometheus
> │   ├── flink-metrics-slf4j
> │   ├── flink-metrics-statsd
> ├── flink-contrib
> │   ├── flink-connector-wikiedits
> │   ├── flink-statebackend-rocksdb
> │   ├── flink-storm
> │   ├── flink-storm-examples
> │   ├── flink-streaming-contrib
> │   ├── flink-tweet-inputformat
> ├── flink-filesystems
> │   ├── flink-hadoop-fs
> │   ├── flink-mapr-fs
> │   ├── flink-s3-fs-hadoop
> │   ├── flink-s3-fs-presto
> ├── flink-connectors
> │   ├── flink-connector-cassandra
> │   ├── flink-connector-elasticsearch
> │   ├── flink-connector-elasticsearch-base
> │   ├── flink-connector-elasticsearch2
> │   ├── flink-connector-elasticsearch5
> │   ├── flink-connector-filesystem
> │   ├── flink-connector-kafka-0.10
> │   ├── flink-connector-kafka-0.11
> │   ├── flink-connector-kafka-0.8
> │   ├── flink-connector-kafka-0.9
> │   ├── flink-connector-kafka-base
> │   ├── flink-connector-kinesis
> │   ├── flink-connector-nifi
> │   ├── flink-connector-rabbitmq
> │   ├── flink-connector-twitter
> │   ├── flink-hadoop-compatibility
> │   ├── flink-hbase
> │   ├── flink-hcatalog
> │   ├── flink-jdbc
> │   ├── flink-orc
> ├── flink-optimizer
> ├── flink-queryable-state
> ├── flink-test-utils-parent
> │   ├── flink-test-utils
> │   ├── flink-test-utils-junit
> ├── flink-tests
> ├── flink-scala
> ├── flink-streaming-java
> ├── flink-streaming-scala
> ├── flink-runtime
> ├── flink-java
> ├── flink-core



> Remove unused dependencies from flink-clients
> -
>
> Key: FLINK-7574
> URL: https://issues.apache.org/jira/browse/FLINK-7574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: Apache Maven 3.3.9, Java version: 1.8.0_144
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>
> [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ 
> flink-clients_2.11 ---
> [WARNING] Used undeclared dependencies found:
> [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile
> [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile
> [WARNING] Unused declared dependencies found:
> [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test
> [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile
> [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test
> [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [WARNING]log4j:log4j:jar:1.2.17:test
> [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test
> [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5076: [FLINK-7574][build] POM Cleanup flink-clients

2017-12-01 Thread yew1eb
Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/5076
  
@zentol , Thanks for your suggestions, i will be very careful to clean up 
the dependencies.
I will start with the top modules (not used by other modules), order as 
follow:
> ├── flink-clients
> ├── flink-scala-shell
> ├── flink-end-to-end-tests
> ├── flink-fs-tests
> ├── flink-mesos
> ├── flink-yarn-tests
> ├── flink-yarn
> ├── flink-formats
> │   ├── flink-avro
> ├── flink-java8
> ├── flink-runtime-web
> ├── flink-examples
> │   ├── flink-examples-batch
> │   ├── flink-examples-streaming
> │   ├── flink-examples-table
> ├── flink-libraries
> │   ├── flink-cep
> │   ├── flink-cep-scala
> │   ├── flink-gelly
> │   ├── flink-gelly-examples
> │   ├── flink-gelly-scala
> │   ├── flink-ml
> │   ├── flink-python
> │   ├── flink-table
> ├── flink-metrics
> │   ├── flink-metrics-core
> │   ├── flink-metrics-datadog
> │   ├── flink-metrics-dropwizard
> │   ├── flink-metrics-ganglia
> │   ├── flink-metrics-graphite
> │   ├── flink-metrics-jmx
> │   ├── flink-metrics-prometheus
> │   ├── flink-metrics-slf4j
> │   ├── flink-metrics-statsd
> ├── flink-contrib
> │   ├── flink-connector-wikiedits
> │   ├── flink-statebackend-rocksdb
> │   ├── flink-storm
> │   ├── flink-storm-examples
> │   ├── flink-streaming-contrib
> │   ├── flink-tweet-inputformat
> ├── flink-filesystems
> │   ├── flink-hadoop-fs
> │   ├── flink-mapr-fs
> │   ├── flink-s3-fs-hadoop
> │   ├── flink-s3-fs-presto
> ├── flink-connectors
> │   ├── flink-connector-cassandra
> │   ├── flink-connector-elasticsearch
> │   ├── flink-connector-elasticsearch-base
> │   ├── flink-connector-elasticsearch2
> │   ├── flink-connector-elasticsearch5
> │   ├── flink-connector-filesystem
> │   ├── flink-connector-kafka-0.10
> │   ├── flink-connector-kafka-0.11
> │   ├── flink-connector-kafka-0.8
> │   ├── flink-connector-kafka-0.9
> │   ├── flink-connector-kafka-base
> │   ├── flink-connector-kinesis
> │   ├── flink-connector-nifi
> │   ├── flink-connector-rabbitmq
> │   ├── flink-connector-twitter
> │   ├── flink-hadoop-compatibility
> │   ├── flink-hbase
> │   ├── flink-hcatalog
> │   ├── flink-jdbc
> │   ├── flink-orc
> ├── flink-optimizer
> ├── flink-queryable-state
> ├── flink-test-utils-parent
> │   ├── flink-test-utils
> │   ├── flink-test-utils-junit
> ├── flink-tests
> ├── flink-scala
> ├── flink-streaming-java
> ├── flink-streaming-scala
> ├── flink-runtime
> ├── flink-java
> ├── flink-core



---


[jira] [Created] (FLINK-8187) Web client does not print errors

2017-12-01 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8187:
---

 Summary: Web client does not print errors
 Key: FLINK-8187
 URL: https://issues.apache.org/jira/browse/FLINK-8187
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Reporter: Timo Walther


When submitting a jar with no defined Main class, the web client does not 
respond anymore and instead of printing the REST error:

{code}
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: 
Could not run the jar.
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Neither 
a 'Main-Class', nor a 'program-class' entry was found in the jar file.
at 
org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:592)
at 
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:188)
at 
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:147)
at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:72)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
{code} 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274503#comment-16274503
 ] 

Sebastian Klemke commented on FLINK-8186:
-

Invocation:

{code}
flink-1.4.0/bin/jobmanager.sh start cluster
flink-1.4.0/bin/taskmanager.sh start
flink-1.4.0/bin/taskmanager.sh start
flink-1.4.0/bin/taskmanager.sh start
flink-1.4.0/bin/taskmanager.sh start
flink-1.4.0/bin/flink run -p 4 -c de.nerdheim.flink.GenericRecordCount 
target/generic-record-count-1.0-SNAPSHOT.jar --input 
{code}

> AvroInputFormat regression: fails to deserialize GenericRecords on cluster 
> with hadoop27 compat
> ---
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8181) Kafka producer's FlinkFixedPartitioner returns different partitions when a target topic is rescaled

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274500#comment-16274500
 ] 

ASF GitHub Bot commented on FLINK-8181:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5108#discussion_r154368266
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 ---
@@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, 
String targetTopic, int
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
 
-   return partitions[parallelInstanceId % partitions.length];
+   if (topicToFixedPartition.containsKey(targetTopic)) {
--- End diff --

@aljoscha yes, the semantics is a bit odd / needs some clarification before 
we move on. I've been having a go at implementing state checkpointing for the 
`FlinkFixedPartitioner` today, and for example one unclear case I bumped into 
was the following:
Subtask 1 writes to partition X for "some-topic"
Subtask 2 writes to partition Y for "some-topic"
On restore and say the sink is rescaled to DOP of 1, should the single 
subtask continue writing to partition X or Y for "some-topic"?

Regarding the default Kafka behaviour:
It's hash partitioning on the attached key for the records. I've also 
thought about using that as the default instead of the fixed partitioner; see 
the relevant discussion here: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaProducerXX-td16951.html


> Kafka producer's FlinkFixedPartitioner returns different partitions when a 
> target topic is rescaled
> ---
>
> Key: FLINK-8181
> URL: https://issues.apache.org/jira/browse/FLINK-8181
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.5.0
>
>
> On fixing FLINK-6288 and migrating the original {{FlinkFixedPartitioner}} to 
> the new partitioning API (commit 9ed9b68397b51bfd2b0f6e532212a82f771641bd), 
> the {{FlinkFixedPartitioner}} no longer returns identical target partitions 
> once a target topic is rescaled.
> This results in a behavioral regression when the {{FlinkFixedPartitioner}} is 
> used.
> The {{FlinkFixedPartitionerTest}} should also be strengthened to cover the 
> target topic rescaling case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5108: [FLINK-8181] [kafka] Make FlinkFixedPartitioner in...

2017-12-01 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5108#discussion_r154368266
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 ---
@@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, 
String targetTopic, int
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
 
-   return partitions[parallelInstanceId % partitions.length];
+   if (topicToFixedPartition.containsKey(targetTopic)) {
--- End diff --

@aljoscha yes, the semantics is a bit odd / needs some clarification before 
we move on. I've been having a go at implementing state checkpointing for the 
`FlinkFixedPartitioner` today, and for example one unclear case I bumped into 
was the following:
Subtask 1 writes to partition X for "some-topic"
Subtask 2 writes to partition Y for "some-topic"
On restore and say the sink is rescaled to DOP of 1, should the single 
subtask continue writing to partition X or Y for "some-topic"?

Regarding the default Kafka behaviour:
It's hash partitioning on the attached key for the records. I've also 
thought about using that as the default instead of the fixed partitioner; see 
the relevant discussion here: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaProducerXX-td16951.html


---


[jira] [Updated] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastian Klemke updated FLINK-8186:

Attachment: pom.xml
GenericRecordCount.java

> AvroInputFormat regression: fails to deserialize GenericRecords on cluster 
> with hadoop27 compat
> ---
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
> Attachments: GenericRecordCount.java, pom.xml
>
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274497#comment-16274497
 ] 

Sebastian Klemke commented on FLINK-8186:
-

Hey [~twalthr], I used the flink-quickstart-java and added flink-clients_2.11 
and flink-avro as dependencies. It runs on YARN, though. So maybe it's a 
documentation issue? Does it make sense to use hadoop27 flavour for a 
standalone cluster? I'll attach pom.xml + java source.

> AvroInputFormat regression: fails to deserialize GenericRecords on cluster 
> with hadoop27 compat
> ---
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8181) Kafka producer's FlinkFixedPartitioner returns different partitions when a target topic is rescaled

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274482#comment-16274482
 ] 

ASF GitHub Bot commented on FLINK-8181:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5108#discussion_r154363656
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 ---
@@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, 
String targetTopic, int
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
 
-   return partitions[parallelInstanceId % partitions.length];
+   if (topicToFixedPartition.containsKey(targetTopic)) {
--- End diff --

It might be that the "fixed partitioner" is hard to implement or that its 
semantics are somewhat strange. Not sure though. What's the default Kafka 
behaviour? Maybe we should also have that as the default behaviour. 


> Kafka producer's FlinkFixedPartitioner returns different partitions when a 
> target topic is rescaled
> ---
>
> Key: FLINK-8181
> URL: https://issues.apache.org/jira/browse/FLINK-8181
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.5.0
>
>
> On fixing FLINK-6288 and migrating the original {{FlinkFixedPartitioner}} to 
> the new partitioning API (commit 9ed9b68397b51bfd2b0f6e532212a82f771641bd), 
> the {{FlinkFixedPartitioner}} no longer returns identical target partitions 
> once a target topic is rescaled.
> This results in a behavioral regression when the {{FlinkFixedPartitioner}} is 
> used.
> The {{FlinkFixedPartitionerTest}} should also be strengthened to cover the 
> target topic rescaling case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5108: [FLINK-8181] [kafka] Make FlinkFixedPartitioner in...

2017-12-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5108#discussion_r154363656
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 ---
@@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, 
String targetTopic, int
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
 
-   return partitions[parallelInstanceId % partitions.length];
+   if (topicToFixedPartition.containsKey(targetTopic)) {
--- End diff --

It might be that the "fixed partitioner" is hard to implement or that its 
semantics are somewhat strange. Not sure though. What's the default Kafka 
behaviour? Maybe we should also have that as the default behaviour. 


---


[jira] [Commented] (FLINK-8181) Kafka producer's FlinkFixedPartitioner returns different partitions when a target topic is rescaled

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274480#comment-16274480
 ] 

ASF GitHub Bot commented on FLINK-8181:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5108#discussion_r154363505
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 ---
@@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, 
String targetTopic, int
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
 
-   return partitions[parallelInstanceId % partitions.length];
+   if (topicToFixedPartition.containsKey(targetTopic)) {
--- End diff --

Maybe yes, but I don't think we can do it this quickly now. Adding stateful 
stuff has big implications and we should make sure to understand them all.


> Kafka producer's FlinkFixedPartitioner returns different partitions when a 
> target topic is rescaled
> ---
>
> Key: FLINK-8181
> URL: https://issues.apache.org/jira/browse/FLINK-8181
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.5.0
>
>
> On fixing FLINK-6288 and migrating the original {{FlinkFixedPartitioner}} to 
> the new partitioning API (commit 9ed9b68397b51bfd2b0f6e532212a82f771641bd), 
> the {{FlinkFixedPartitioner}} no longer returns identical target partitions 
> once a target topic is rescaled.
> This results in a behavioral regression when the {{FlinkFixedPartitioner}} is 
> used.
> The {{FlinkFixedPartitionerTest}} should also be strengthened to cover the 
> target topic rescaling case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5108: [FLINK-8181] [kafka] Make FlinkFixedPartitioner in...

2017-12-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5108#discussion_r154363505
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 ---
@@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, 
String targetTopic, int
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
 
-   return partitions[parallelInstanceId % partitions.length];
+   if (topicToFixedPartition.containsKey(targetTopic)) {
--- End diff --

Maybe yes, but I don't think we can do it this quickly now. Adding stateful 
stuff has big implications and we should make sure to understand them all.


---


[jira] [Commented] (FLINK-8176) Dispatcher does not start SubmittedJobGraphStore

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274468#comment-16274468
 ] 

ASF GitHub Bot commented on FLINK-8176:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r154359352
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
@@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception {
heartbeatServices,
mock(MetricRegistryImpl.class),
fatalErrorHandler,
-   jobManagerRunner,
-   jobId);
+   mockJobManagerRunner,
+   TEST_JOB_ID);
 
-   try {
-   dispatcher.start();
+   dispatcher.start();
+   }
 
-   CompletableFuture leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
+   @After
+   public void tearDown() throws Exception {
+   try {
+   fatalErrorHandler.rethrowError();
+   } finally {
+   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+   }
+   }
 
-   // wait for the leader to be elected
-   leaderFuture.get();
+   /**
+* Tests that we can submit a job to the Dispatcher which then spawns a
+* new JobManagerRunner.
+*/
+   @Test
+   public void testJobSubmission() throws Exception {
+   CompletableFuture leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
 
-   DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+   // wait for the leader to be elected
+   leaderFuture.get();
 
-   CompletableFuture acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
+   DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-   acknowledgeFuture.get();
+   CompletableFuture acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
 
-   verify(jobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
+   acknowledgeFuture.get();
 
-   // check that no error has occurred
-   fatalErrorHandler.rethrowError();
-   } finally {
-   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
-   }
+   verify(mockJobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
}
 
/**
 * Tests that the dispatcher takes part in the leader election.
 */
@Test
public void testLeaderElection() throws Exception {
-   TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
-   TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-
UUID expectedLeaderSessionId = UUID.randomUUID();
-   CompletableFuture leaderSessionIdFuture = new 
CompletableFuture<>();
-   SubmittedJobGraphStore mockSubmittedJobGraphStore = 
mock(SubmittedJobGraphStore.class);
-   TestingLeaderElectionService testingLeaderElectionService = new 
TestingLeaderElectionService() {
-   @Override
-   public void confirmLeaderSessionID(UUID 
leaderSessionId) {
-   super.confirmLeaderSessionID(leaderSessionId);
-   leaderSessionIdFuture.complete(leaderSessionId);
-   }
-   };
-
-   
haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
-   
haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
-   HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
-   final JobID jobId = new JobID();
-
-   final TestingDispatcher dispatcher = new TestingDispatcher(
-   rpcService,
-   Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
-   new Configuration(),
-   haServices,
-   mock(ResourceManagerGateway.class),
-   mock(BlobServer.class),
-   heartbeatServices,
-   mock(MetricRegistryImpl.class),
-   fatalErrorHandler,
-   mock(JobManagerRunner.class),
-   jobId);
 
-   try {
-   dispatcher.sta

[jira] [Commented] (FLINK-8176) Dispatcher does not start SubmittedJobGraphStore

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274469#comment-16274469
 ] 

ASF GitHub Bot commented on FLINK-8176:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r154357254
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -534,6 +536,40 @@ public void handleError(final Exception exception) {
onFatalError(new DispatcherException("Received an error from 
the LeaderElectionService.", exception));
}
 
+   //--
+   // SubmittedJobGraphListener
+   //--
+
+   @Override
+   public void onAddedJobGraph(final JobID jobId) {
+   runAsync(() -> {
+   final SubmittedJobGraph submittedJobGraph;
+   try {
+   submittedJobGraph = 
submittedJobGraphStore.recoverJobGraph(jobId);
+   } catch (final Exception e) {
+   log.error("Could not submit job {}.", jobId, e);
--- End diff --

Maybe "could not recover job".


> Dispatcher does not start SubmittedJobGraphStore
> 
>
> Key: FLINK-8176
> URL: https://issues.apache.org/jira/browse/FLINK-8176
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Job-Submission, YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> {{Dispatcher}} never calls start on its {{SubmittedJobGraphStore}} instance. 
> Hence, when a Job is submitted (YARN session mode with HA enabled), an 
> {{IllegalStateException}} is thrown:
> {noformat}
> java.lang.IllegalStateException: Not running. Forgot to call start()?
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.verifyIsRunning(ZooKeeperSubmittedJobGraphStore.java:411)
> at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.putJobGraph(ZooKeeperSubmittedJobGraphStore.java:222)
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:202)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:207)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:151)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129)
> at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> {noformat}
> *Expected Behavior*
> In {{start()}} method, the submittedJobGraphStore should be started as so:
> {code}
> submittedJobGraphStore.start(this);
> {code}
> To enable this, the {{Dispatcher}} must implement the interface 
> {{SubmittedJobGraphStore.SubmittedJobGraphListener}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r154357254
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -534,6 +536,40 @@ public void handleError(final Exception exception) {
onFatalError(new DispatcherException("Received an error from 
the LeaderElectionService.", exception));
}
 
+   //--
+   // SubmittedJobGraphListener
+   //--
+
+   @Override
+   public void onAddedJobGraph(final JobID jobId) {
+   runAsync(() -> {
+   final SubmittedJobGraph submittedJobGraph;
+   try {
+   submittedJobGraph = 
submittedJobGraphStore.recoverJobGraph(jobId);
+   } catch (final Exception e) {
+   log.error("Could not submit job {}.", jobId, e);
--- End diff --

Maybe "could not recover job".


---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r154359352
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
@@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception {
heartbeatServices,
mock(MetricRegistryImpl.class),
fatalErrorHandler,
-   jobManagerRunner,
-   jobId);
+   mockJobManagerRunner,
+   TEST_JOB_ID);
 
-   try {
-   dispatcher.start();
+   dispatcher.start();
+   }
 
-   CompletableFuture leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
+   @After
+   public void tearDown() throws Exception {
+   try {
+   fatalErrorHandler.rethrowError();
+   } finally {
+   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+   }
+   }
 
-   // wait for the leader to be elected
-   leaderFuture.get();
+   /**
+* Tests that we can submit a job to the Dispatcher which then spawns a
+* new JobManagerRunner.
+*/
+   @Test
+   public void testJobSubmission() throws Exception {
+   CompletableFuture leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
 
-   DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+   // wait for the leader to be elected
+   leaderFuture.get();
 
-   CompletableFuture acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
+   DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-   acknowledgeFuture.get();
+   CompletableFuture acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
 
-   verify(jobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
+   acknowledgeFuture.get();
 
-   // check that no error has occurred
-   fatalErrorHandler.rethrowError();
-   } finally {
-   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
-   }
+   verify(mockJobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
}
 
/**
 * Tests that the dispatcher takes part in the leader election.
 */
@Test
public void testLeaderElection() throws Exception {
-   TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
-   TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-
UUID expectedLeaderSessionId = UUID.randomUUID();
-   CompletableFuture leaderSessionIdFuture = new 
CompletableFuture<>();
-   SubmittedJobGraphStore mockSubmittedJobGraphStore = 
mock(SubmittedJobGraphStore.class);
-   TestingLeaderElectionService testingLeaderElectionService = new 
TestingLeaderElectionService() {
-   @Override
-   public void confirmLeaderSessionID(UUID 
leaderSessionId) {
-   super.confirmLeaderSessionID(leaderSessionId);
-   leaderSessionIdFuture.complete(leaderSessionId);
-   }
-   };
-
-   
haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
-   
haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
-   HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
-   final JobID jobId = new JobID();
-
-   final TestingDispatcher dispatcher = new TestingDispatcher(
-   rpcService,
-   Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
-   new Configuration(),
-   haServices,
-   mock(ResourceManagerGateway.class),
-   mock(BlobServer.class),
-   heartbeatServices,
-   mock(MetricRegistryImpl.class),
-   fatalErrorHandler,
-   mock(JobManagerRunner.class),
-   jobId);
 
-   try {
-   dispatcher.start();
+   
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
 
-   assertFalse(leaderSessionIdFuture.isDone());
+   
dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
 

[jira] [Commented] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274465#comment-16274465
 ] 

ASF GitHub Bot commented on FLINK-8173:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5111

[FLINK-8173] [table] Fix input unboxing and support Avro Utf8

## What is the purpose of the change

Fixes a code generation issue that caused problems with Avro types. It adds 
tests for passing and working with Avro types.

## Brief change log

Added explicit result type casts in code generator.

## Verifying this change

- AvroTypesITCase added

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8173

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5111


commit 6b9f26a6b21f19530d4870104c0cd084a3a01cda
Author: twalthr 
Date:   2017-12-01T14:37:23Z

[FLINK-8173] [table] Fix input unboxing and support Avro Utf8




> InvalidProgramException: Table program cannot be compiled. This is a bug. 
> Please file an issue.
> ---
>
> Key: FLINK-8173
> URL: https://issues.apache.org/jira/browse/FLINK-8173
> Project: Flink
>  Issue Type: Bug
>Reporter: Tao Xia
>Assignee: Timo Walther
>
> It is a stream of Avro objects, simply select a String field and trying to 
> print out
> val query = "SELECT nd_key FROM table1"
> val result = tableEnv.sql(query)
> tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print()
> 11/29/2017 16:07:36   Source: Custom Source -> from: (accepted_cohort_id, 
> admin_id, after_submission, amount_paid, anonymous_id, application_id, 
> atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, 
> concept_key, concept_rank, context, context_campaign, context_experiment, 
> coupon_code, course_key, course_rank, cta_destination, cta_location, 
> cta_message, cta_type, currency, decision_group_id, device_browser, 
> device_os, device_os_version, device_type, duration, evaluation_id, 
> event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, 
> lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, 
> module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, 
> notification_id, num_concepts_completed, num_interactions, 
> num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, 
> pause_reason, payment_plan, payment_provider, points_earned, points_possible, 
> price, price_sheet, product_key, product_type, provider_charge_id, 
> provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, 
> results, scholarship_group_key, search_term, skill_level, subscription_id, 
> suspension_length, suspension_reason, technology, timestamp, total_concepts, 
> total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, 
> user_response, variant, version, workspace_id, workspace_session, 
> workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) 
> switched to FAILED 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33)
>   at 
> org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTas

[GitHub] flink pull request #5111: [FLINK-8173] [table] Fix input unboxing and suppor...

2017-12-01 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5111

[FLINK-8173] [table] Fix input unboxing and support Avro Utf8

## What is the purpose of the change

Fixes a code generation issue that caused problems with Avro types. It adds 
tests for passing and working with Avro types.

## Brief change log

Added explicit result type casts in code generator.

## Verifying this change

- AvroTypesITCase added

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8173

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5111


commit 6b9f26a6b21f19530d4870104c0cd084a3a01cda
Author: twalthr 
Date:   2017-12-01T14:37:23Z

[FLINK-8173] [table] Fix input unboxing and support Avro Utf8




---


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat

2017-12-01 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274452#comment-16274452
 ] 

Timo Walther commented on FLINK-8186:
-

[~packet] we moved Avro out of the Flink core into a dedicated {{flink-avro}} 
module in order to reduce our default dependencies. Did you add {{flink-avro}} 
to your dependencies? I just checked that 
{{org.apache.avro.generic.GenericRecord}} is included in the {{flink-dist}} jar 
that is placed in the {{/lib}} folder. I will try to reproduce your issue.

> AvroInputFormat regression: fails to deserialize GenericRecords on cluster 
> with hadoop27 compat
> ---
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5110: [hotfix][docs] Fix typo in Checkpointing doc

2017-12-01 Thread chuanlei
GitHub user chuanlei opened a pull request:

https://github.com/apache/flink/pull/5110

[hotfix][docs] Fix typo in Checkpointing doc



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chuanlei/flink fix-typo-in-checkpointing-doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5110


commit 5e3dfcee57d20566ba79e1aeeaafea12e02a9b61
Author: nichuanlei 
Date:   2017-12-01T14:06:21Z

fix typo in checkpointing doc




---


[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274435#comment-16274435
 ] 

Sebastian Klemke commented on FLINK-8186:
-

Test program runs with Flink 1.4 hadoop27 on YARN. Is no-hadoop flavour 
recommended for standalone?

> AvroInputFormat regression: fails to deserialize GenericRecords on cluster 
> with hadoop27 compat
> ---
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastian Klemke updated FLINK-8186:

Priority: Minor  (was: Major)

> AvroInputFormat regression: fails to deserialize GenericRecords on cluster 
> with hadoop27 compat
> ---
>
> Key: FLINK-8186
> URL: https://issues.apache.org/jira/browse/FLINK-8186
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Priority: Minor
>
> The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
> 1.4.0 RC2 standalone cluster, "hadoop27" flavour:
> {code}
> public class GenericRecordCount {
> public static void main(String[] args) throws Exception {
> String input = ParameterTool.fromArgs(args).getRequired("input");
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> long count = env.readFile(new AvroInputFormat<>(new Path(input), 
> GenericRecord.class), input)
> .count();
> System.out.printf("Counted %d records\n", count);
> }
> }
> {code}
> Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour 
> standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
> {code}
> 12/01/2017 13:22:09 DataSource (at 
> readFile(ExecutionEnvironment.java:514) 
> (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
> java.lang.RuntimeException: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at 
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: 
> org.apache.avro.generic.GenericRecord.()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> ... 11 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-12-01 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan reassigned FLINK-5506:
-

Assignee: Greg Hogan

> Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
> -
>
> Key: FLINK-5506
> URL: https://issues.apache.org/jira/browse/FLINK-5506
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.4, 1.3.2, 1.4.1
>Reporter: Miguel E. Coimbra
>Assignee: Greg Hogan
>  Labels: easyfix, newbie
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Reporting this here as per Vasia's advice.
> I am having the following problem while trying out the 
> org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API 
> (Java).
> Specs: JDK 1.8.0_102 x64
> Apache Flink: 1.1.4
> Suppose I have a very small (I tried an example with 38 vertices as well) 
> dataset stored in a tab-separated file 3-vertex.tsv:
> {code}
> #id1 id2 score
> 010
> 020
> 030
> {code}
> This is just a central vertex with 3 neighbors (disconnected between 
> themselves).
> I am loading the dataset and executing the algorithm with the following code:
> {code}
> // Load the data from the .tsv file.
> final DataSet> edgeTuples = 
> env.readCsvFile(inputPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#")  // comments start with "%"
> .types(Long.class, Long.class, Double.class);
> // Generate a graph and add reverse edges (undirected).
> final Graph graph = Graph.fromTupleDataSet(edgeTuples, 
> new MapFunction() {
> private static final long serialVersionUID = 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env).getUndirected();
> // CommunityDetection parameters.
> final double hopAttenuationDelta = 0.5d;
> final int iterationCount = 10;
> // Prepare and trigger the execution.
> DataSet> vs = graph.run(new 
> org.apache.flink.graph.library.CommunityDetection(iterationCount, 
> hopAttenuationDelta)).getVertices();
> vs.print();
> {code}
> ​Running this code throws the following exception​ (check the bold line):
> {code}
> ​org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)
> at 
> org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at 
> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at 
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> at 
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)​
> {code}
> ​After a further look, I set a breakpoint (Eclipse IDE debugging) at the line 
> in bold:
> org.apache.flink.graph.library.CommunityDetection.java (source code accessed 
> automatically by Maven)
> // find the highest score of maxScoreLabel
> double highestScore = labelsWithHighestScore.get(maxScoreLabel);​
> ​- maxSc

[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274408#comment-16274408
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154344942
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 ---
@@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock 
invocationOnMock) throws T
 
// 
-
 
-   static class InfiniteSubpartitionView implements ResultSubpartitionView 
{
+   static class InfiniteSubpartitionView extends ResultSubpartitionView {
 
private final BufferProvider bufferProvider;
 
private final CountDownLatch sync;
 
public InfiniteSubpartitionView(BufferProvider bufferProvider, 
CountDownLatch sync) {
+   super(mock(ResultSubpartition.class));
+
this.bufferProvider = checkNotNull(bufferProvider);
this.sync = checkNotNull(sync);
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, 
InterruptedException {
+   protected Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

`@Nullable`


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274404#comment-16274404
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154344924
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -114,7 +116,7 @@ public void onNotification() {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   protected Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

`@Nullable`


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274407#comment-16274407
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154343587
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,57 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
+
+   /**
+* Returns the next {@link Buffer} instance of this queue iterator and 
also
+* decreases the related statistics.
+*/
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
--- End diff --

`@Nullable` and (I'm not sure whether @pnowojski agrees) maybe make this 
method `final` (any subclass should  only override `getNextBufferInternal`)?


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274406#comment-16274406
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154343656
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,57 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
+
+   /**
+* Returns the next {@link Buffer} instance of this queue iterator and 
also
+* decreases the related statistics.
+*/
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   Buffer buffer = getNextBufferInternal();
+   if (buffer != null) {
+   parent.decreaseStatistics(buffer);
+   }
+   return buffer;
+   }
+
+   public int getBuffersInBacklog() {
+   return parent.getBuffersInBacklog();
+   }
 
/**
-* Returns the next {@link Buffer} instance of this queue iterator.
-* 
-* If there is currently no instance available, it will return 
null.
+* The internal method used by {@link 
ResultSubpartitionView#getNextBuffer()}
+* to return the next {@link Buffer} instance of this queue iterator.
+*
+* If there is currently no instance available, it will return 
null.
 * This might happen for example when a pipelined queue producer is 
slower
 * than the consumer or a spilled queue needs to read in more data.
-* 
-* Important: The consumer has to make sure that each
+*
+* Important: The consumer has to make sure that 
each
 * buffer instance will eventually be recycled with {@link 
Buffer#recycle()}
 * after it has been consumed.
 */
-   Buffer getNextBuffer() throws IOException, InterruptedException;
-
-   void notifyBuffersAvailable(long buffers) throws IOException;
+   protected abstract Buffer getNextBufferInternal() throws IOException, 
InterruptedException;
--- End diff --

`@Nullable`


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274409#comment-16274409
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154344889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -133,7 +135,7 @@ int releaseMemory() throws IOException {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   protected Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

`@Nullable`


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274405#comment-16274405
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154343510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 ---
@@ -39,13 +39,15 @@
private final AtomicBoolean isReleased;
 
PipelinedSubpartitionView(PipelinedSubpartition parent, 
BufferAvailabilityListener listener) {
+   super(parent);
+
this.parent = checkNotNull(parent);
this.availabilityListener = checkNotNull(listener);
this.isReleased = new AtomicBoolean();
}
 
@Override
-   public Buffer getNextBuffer() {
+   protected Buffer getNextBufferInternal() {
--- End diff --

Actually, a lot of the methods along these calls should probably be marked 
`@Nullable`. Since you touched the `getNextBufferInternal()`, can you at least 
mark this (and maybe some calls along the call stack if I say pretty please?). 
You can do so in a separate `[hotfix]` commit to keep this separate


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154343510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 ---
@@ -39,13 +39,15 @@
private final AtomicBoolean isReleased;
 
PipelinedSubpartitionView(PipelinedSubpartition parent, 
BufferAvailabilityListener listener) {
+   super(parent);
+
this.parent = checkNotNull(parent);
this.availabilityListener = checkNotNull(listener);
this.isReleased = new AtomicBoolean();
}
 
@Override
-   public Buffer getNextBuffer() {
+   protected Buffer getNextBufferInternal() {
--- End diff --

Actually, a lot of the methods along these calls should probably be marked 
`@Nullable`. Since you touched the `getNextBufferInternal()`, can you at least 
mark this (and maybe some calls along the call stack if I say pretty please?). 
You can do so in a separate `[hotfix]` commit to keep this separate


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154344889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -133,7 +135,7 @@ int releaseMemory() throws IOException {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   protected Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

`@Nullable`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154344924
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -114,7 +116,7 @@ public void onNotification() {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   protected Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

`@Nullable`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154344942
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 ---
@@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock 
invocationOnMock) throws T
 
// 
-
 
-   static class InfiniteSubpartitionView implements ResultSubpartitionView 
{
+   static class InfiniteSubpartitionView extends ResultSubpartitionView {
 
private final BufferProvider bufferProvider;
 
private final CountDownLatch sync;
 
public InfiniteSubpartitionView(BufferProvider bufferProvider, 
CountDownLatch sync) {
+   super(mock(ResultSubpartition.class));
+
this.bufferProvider = checkNotNull(bufferProvider);
this.sync = checkNotNull(sync);
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, 
InterruptedException {
+   protected Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

`@Nullable`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154343587
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,57 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
+
+   /**
+* Returns the next {@link Buffer} instance of this queue iterator and 
also
+* decreases the related statistics.
+*/
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
--- End diff --

`@Nullable` and (I'm not sure whether @pnowojski agrees) maybe make this 
method `final` (any subclass should  only override `getNextBufferInternal`)?


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154343656
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,57 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
+
+   /**
+* Returns the next {@link Buffer} instance of this queue iterator and 
also
+* decreases the related statistics.
+*/
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   Buffer buffer = getNextBufferInternal();
+   if (buffer != null) {
+   parent.decreaseStatistics(buffer);
+   }
+   return buffer;
+   }
+
+   public int getBuffersInBacklog() {
+   return parent.getBuffersInBacklog();
+   }
 
/**
-* Returns the next {@link Buffer} instance of this queue iterator.
-* 
-* If there is currently no instance available, it will return 
null.
+* The internal method used by {@link 
ResultSubpartitionView#getNextBuffer()}
+* to return the next {@link Buffer} instance of this queue iterator.
+*
+* If there is currently no instance available, it will return 
null.
 * This might happen for example when a pipelined queue producer is 
slower
 * than the consumer or a spilled queue needs to read in more data.
-* 
-* Important: The consumer has to make sure that each
+*
+* Important: The consumer has to make sure that 
each
 * buffer instance will eventually be recycled with {@link 
Buffer#recycle()}
 * after it has been consumed.
 */
-   Buffer getNextBuffer() throws IOException, InterruptedException;
-
-   void notifyBuffersAvailable(long buffers) throws IOException;
+   protected abstract Buffer getNextBufferInternal() throws IOException, 
InterruptedException;
--- End diff --

`@Nullable`


---


[jira] [Created] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat

2017-12-01 Thread Sebastian Klemke (JIRA)
Sebastian Klemke created FLINK-8186:
---

 Summary: AvroInputFormat regression: fails to deserialize 
GenericRecords on cluster with hadoop27 compat
 Key: FLINK-8186
 URL: https://issues.apache.org/jira/browse/FLINK-8186
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.0
Reporter: Sebastian Klemke


The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 
1.4.0 RC2 standalone cluster, "hadoop27" flavour:
{code}
public class GenericRecordCount {
public static void main(String[] args) throws Exception {
String input = ParameterTool.fromArgs(args).getRequired("input");

ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

long count = env.readFile(new AvroInputFormat<>(new Path(input), 
GenericRecord.class), input)
.count();

System.out.printf("Counted %d records\n", count);
}
}
{code}
Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour standalone 
cluster, though. Exception thrown in Flink 1.4.0 hadoop27:
{code}
12/01/2017 13:22:09 DataSource (at readFile(ExecutionEnvironment.java:514) 
(org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED
java.lang.RuntimeException: java.lang.NoSuchMethodException: 
org.apache.avro.generic.GenericRecord.()
at 
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
at 
org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at 
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodException: 
org.apache.avro.generic.GenericRecord.()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.getDeclaredConstructor(Class.java:2178)
at 
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
... 11 more
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274364#comment-16274364
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154315390
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -144,20 +147,19 @@ public void testReceiveBuffer() throws Exception {
inputGate.setBufferPool(bufferPool);
inputGate.assignExclusiveSegments(networkBufferPool, 2);
 
-   final int backlog = 2;
-   final BufferResponse bufferResponse = 
createBufferResponse(
-   inputChannel.requestBuffer(), 0, 
inputChannel.getInputChannelId(), backlog);
-
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
handler.addInputChannel(inputChannel);
 
+   final int backlog = 2;
+   final BufferResponse bufferResponse = 
createBufferResponse(
+   TestBufferFactory.createBuffer(32), 0, 
inputChannel.getInputChannelId(), backlog);
--- End diff --

good catch not getting the buffer from the channel here


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274375#comment-16274375
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154322032
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
+
+   final int highWaterMark = 
channel.config().getWriteBufferHighWaterMark();
+   // Set the writer index to the high water mark to ensure that 
all bytes are written
+   // to the wire although the buffer is "empty".
+   
channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
+
+   // Enqueue the input channel on the condition of un-writable 
channel
+   inputChannel1.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
 
channel.runPendingTasks();
 
-   // Read the enqueued msg
-   Object msg1 = channel.readOutbound();
+   // The input channel will not notify credits via un-writable 
channel
+   assertFalse(channel.isWritable());
+   verify(inputChannel1, times(1)).getAndResetCredit();
 
-   // Should notify credit
-   assertEquals(msg1.getClass(), AddCredit.class);
+   // Flush the buffer to make the channel writable again
+   channel.flush();
+
+   // The input channel should notify credits via triggering 
channel's writability changed event
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(2)).getAndResetCredit();
}
 
/**
 * Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, but {@link AddCredit}
-* message is not sent actually after this input channel is released.
+* message is not sent actually when this input channel is released.
 */
@Test
public void testNotifyCreditAvailableAfterReleased() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.getPartitionId().getPartitionId(), 
inputChannel);
+
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
final EmbeddedChannel channel = new EmbeddedChannel(handler);
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
-
-   // Enqueue the input channel then rele

[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274368#comment-16274368
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154321304
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
+
+   final int highWaterMark = 
channel.config().getWriteBufferHighWaterMark();
+   // Set the writer index to the high water mark to ensure that 
all bytes are written
+   // to the wire although the buffer is "empty".
+   
channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
+
+   // Enqueue the input channel on the condition of un-writable 
channel
+   inputChannel1.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
 
channel.runPendingTasks();
 
-   // Read the enqueued msg
-   Object msg1 = channel.readOutbound();
+   // The input channel will not notify credits via un-writable 
channel
+   assertFalse(channel.isWritable());
+   verify(inputChannel1, times(1)).getAndResetCredit();
--- End diff --

same as above


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274373#comment-16274373
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154337233
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -223,11 +229,13 @@ void notifySubpartitionConsumed() {
/**
 * Releases all exclusive and floating buffers, closes the partition 
request client.
 */
+   @VisibleForTesting
@Override
-   void releaseAllResources() throws IOException {
+   public void releaseAllResources() throws IOException {
--- End diff --

`@VisibleForTesting` or `public` is not needed anymore since you adapted 
the tests. Please make it package-private.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274377#comment-16274377
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154321327
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
+
+   final int highWaterMark = 
channel.config().getWriteBufferHighWaterMark();
+   // Set the writer index to the high water mark to ensure that 
all bytes are written
+   // to the wire although the buffer is "empty".
+   
channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
+
+   // Enqueue the input channel on the condition of un-writable 
channel
+   inputChannel1.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
 
channel.runPendingTasks();
 
-   // Read the enqueued msg
-   Object msg1 = channel.readOutbound();
+   // The input channel will not notify credits via un-writable 
channel
+   assertFalse(channel.isWritable());
+   verify(inputChannel1, times(1)).getAndResetCredit();
 
-   // Should notify credit
-   assertEquals(msg1.getClass(), AddCredit.class);
+   // Flush the buffer to make the channel writable again
+   channel.flush();
+
+   // The input channel should notify credits via triggering 
channel's writability changed event
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(2)).getAndResetCredit();
--- End diff --

also here (`AddCredit` as well as verifying the unannounced credit)


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are

[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274369#comment-16274369
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154319502
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -108,17 +117,75 @@ public void testReceiveEmptyBuffer() throws Exception 
{
final Buffer emptyBuffer = TestBufferFactory.createBuffer();
emptyBuffer.setSize(0);
 
+   final int backlog = 2;
final BufferResponse receivedBuffer = createBufferResponse(
-   emptyBuffer, 0, 
inputChannel.getInputChannelId());
+   emptyBuffer, 0, inputChannel.getInputChannelId(), 
backlog);
 
-   final PartitionRequestClientHandler client = new 
PartitionRequestClientHandler();
+   final CreditBasedClientHandler client = new 
CreditBasedClientHandler();
client.addInputChannel(inputChannel);
 
// Read the empty buffer
client.channelRead(mock(ChannelHandlerContext.class), 
receivedBuffer);
 
// This should not throw an exception
verify(inputChannel, never()).onError(any(Throwable.class));
+   verify(inputChannel, times(1)).onEmptyBuffer(0, backlog);
+   }
+
+   /**
+* Verifies that {@link RemoteInputChannel#onBuffer(Buffer, int, int)} 
is called when a
+* {@link BufferResponse} is received.
+*/
+   @Test
+   public void testReceiveBuffer() throws Exception {
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+   
inputGate.setInputChannel(inputChannel.getPartitionId().getPartitionId(), 
inputChannel);
+   try {
+   final BufferPool bufferPool = 
networkBufferPool.createBufferPool(8, 8);
+   inputGate.setBufferPool(bufferPool);
+   inputGate.assignExclusiveSegments(networkBufferPool, 2);
+
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   handler.addInputChannel(inputChannel);
+
+   final int backlog = 2;
+   final BufferResponse bufferResponse = 
createBufferResponse(
+   TestBufferFactory.createBuffer(32), 0, 
inputChannel.getInputChannelId(), backlog);
+   handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse);
+
+   verify(inputChannel, 
times(1)).onBuffer(any(Buffer.class), anyInt(), anyInt());
+   verify(inputChannel, times(1)).onSenderBacklog(backlog);
--- End diff --

If you used `RemoteInputChannel#getNumberOfRequiredBuffers()` here to check 
the backlog's value instead, we could make the `onSenderBacklog()` method 
package-private again and remove the `@VisibleForTesting`


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274372#comment-16274372
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154320189
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
--- End diff --

Please use a `BufferResponse` (as above, with an appropriate backlog) to 
trigger the same result without a new method only used by tests - this will 
also test more code paths (as suggested).


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154318950
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -280,94 +280,120 @@ public String toString() {
// 

 
/**
-* Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+* Enqueue this input channel in the pipeline for notifying the 
producer of unannounced credit.
 */
void notifyCreditAvailable() {
-   //TODO in next PR
+   checkState(partitionRequestClient != null, "Tried to send task 
event to producer before requesting a queue.");
+
+   // We should skip the notification if this channel is already 
released.
+   if (!isReleased.get()) {
+   partitionRequestClient.notifyCreditAvailable(this);
+   }
}
 
/**
-* Exclusive buffer is recycled to this input channel directly and it 
may trigger notify
-* credit to producer.
+* Exclusive buffer is recycled to this input channel directly and it 
may trigger return extra
+* floating buffer and notify increased credit to the producer.
 *
 * @param segment The exclusive segment of this channel.
 */
@Override
public void recycle(MemorySegment segment) {
-   synchronized (availableBuffers) {
-   // Important: the isReleased check should be inside the 
synchronized block.
-   // that way the segment can also be returned to global 
pool after added into
-   // the available queue during releasing all resources.
+   int numAddedBuffers;
+
+   synchronized (bufferQueue) {
+   // Important: check the isReleased state inside 
synchronized block, so there is no
+   // race condition when recycle and releaseAllResources 
running in parallel.
if (isReleased.get()) {
try {
-   
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+   
inputGate.returnExclusiveSegments(Collections.singletonList(segment));
return;
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
-   availableBuffers.add(new Buffer(segment, this));
+   numAddedBuffers = bufferQueue.addExclusiveBuffer(new 
Buffer(segment, this), numRequiredBuffers);
}
 
-   if (unannouncedCredit.getAndAdd(1) == 0) {
+   if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(1) == 0) 
{
notifyCreditAvailable();
}
}
 
public int getNumberOfAvailableBuffers() {
-   synchronized (availableBuffers) {
-   return availableBuffers.size();
+   synchronized (bufferQueue) {
+   return bufferQueue.getAvailableBufferSize();
}
}
 
+   @VisibleForTesting
+   public int getNumberOfRequiredBuffers() {
--- End diff --

this could be package-private (and then remove `@VisibleForTesting`)


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154338813
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
+
+   final int highWaterMark = 
channel.config().getWriteBufferHighWaterMark();
+   // Set the writer index to the high water mark to ensure that 
all bytes are written
+   // to the wire although the buffer is "empty".
+   
channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
+
+   // Enqueue the input channel on the condition of un-writable 
channel
+   inputChannel1.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
 
channel.runPendingTasks();
 
-   // Read the enqueued msg
-   Object msg1 = channel.readOutbound();
+   // The input channel will not notify credits via un-writable 
channel
+   assertFalse(channel.isWritable());
+   verify(inputChannel1, times(1)).getAndResetCredit();
 
-   // Should notify credit
-   assertEquals(msg1.getClass(), AddCredit.class);
+   // Flush the buffer to make the channel writable again
+   channel.flush();
+
+   // The input channel should notify credits via triggering 
channel's writability changed event
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
--- End diff --

actually, here we receive the buffer that was blocking the channel first, 
and then the `AddCredit` message - both should be checked appropriately (see 
above)


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274378#comment-16274378
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154338491
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
+
+   final int highWaterMark = 
channel.config().getWriteBufferHighWaterMark();
+   // Set the writer index to the high water mark to ensure that 
all bytes are written
+   // to the wire although the buffer is "empty".
+   
channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
+
+   // Enqueue the input channel on the condition of un-writable 
channel
+   inputChannel1.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
 
channel.runPendingTasks();
 
-   // Read the enqueued msg
-   Object msg1 = channel.readOutbound();
+   // The input channel will not notify credits via un-writable 
channel
+   assertFalse(channel.isWritable());
+   verify(inputChannel1, times(1)).getAndResetCredit();
 
-   // Should notify credit
-   assertEquals(msg1.getClass(), AddCredit.class);
+   // Flush the buffer to make the channel writable again
+   channel.flush();
+
+   // The input channel should notify credits via triggering 
channel's writability changed event
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(2)).getAndResetCredit();
}
 
/**
 * Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, but {@link AddCredit}
-* message is not sent actually after this input channel is released.
+* message is not sent actually when this input channel is released.
 */
@Test
public void testNotifyCreditAvailableAfterReleased() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.getPartitionId().getPartitionId(), 
inputChannel);
+
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
final EmbeddedChannel channel = new EmbeddedChannel(handler);
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
-
-   // Enqueue the input channel then rele

[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154320189
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
--- End diff --

Please use a `BufferResponse` (as above, with an appropriate backlog) to 
trigger the same result without a new method only used by tests - this will 
also test more code paths (as suggested).


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154321304
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
+
+   final int highWaterMark = 
channel.config().getWriteBufferHighWaterMark();
+   // Set the writer index to the high water mark to ensure that 
all bytes are written
+   // to the wire although the buffer is "empty".
+   
channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
+
+   // Enqueue the input channel on the condition of un-writable 
channel
+   inputChannel1.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
 
channel.runPendingTasks();
 
-   // Read the enqueued msg
-   Object msg1 = channel.readOutbound();
+   // The input channel will not notify credits via un-writable 
channel
+   assertFalse(channel.isWritable());
+   verify(inputChannel1, times(1)).getAndResetCredit();
--- End diff --

same as above


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274379#comment-16274379
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154338813
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
+
+   final int highWaterMark = 
channel.config().getWriteBufferHighWaterMark();
+   // Set the writer index to the high water mark to ensure that 
all bytes are written
+   // to the wire although the buffer is "empty".
+   
channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
+
+   // Enqueue the input channel on the condition of un-writable 
channel
+   inputChannel1.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
 
channel.runPendingTasks();
 
-   // Read the enqueued msg
-   Object msg1 = channel.readOutbound();
+   // The input channel will not notify credits via un-writable 
channel
+   assertFalse(channel.isWritable());
+   verify(inputChannel1, times(1)).getAndResetCredit();
 
-   // Should notify credit
-   assertEquals(msg1.getClass(), AddCredit.class);
+   // Flush the buffer to make the channel writable again
+   channel.flush();
+
+   // The input channel should notify credits via triggering 
channel's writability changed event
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
--- End diff --

actually, here we receive the buffer that was blocking the channel first, 
and then the `AddCredit` message - both should be checked appropriately (see 
above)


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> *

[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274367#comment-16274367
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154318828
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -378,6 +381,11 @@ public void notifyBufferDestroyed() {
// Nothing to do actually.
}
 
+   @VisibleForTesting
+   public void increaseCredit(int credit) {
--- End diff --

I think we can live without this method that should never be called outside 
the tests (since it modifies state): creating a `BufferResponse` with an 
appropriate backlog should yield the same result in the tests


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274366#comment-16274366
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154321125
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
--- End diff --

Instead of verifying the method being called, we could add a (test-visible) 
`getUnannouncedCredit()` method and check the actual credit (since this new 
method would only read the state, it is ok to have it).


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274376#comment-16274376
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154318950
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -280,94 +280,120 @@ public String toString() {
// 

 
/**
-* Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+* Enqueue this input channel in the pipeline for notifying the 
producer of unannounced credit.
 */
void notifyCreditAvailable() {
-   //TODO in next PR
+   checkState(partitionRequestClient != null, "Tried to send task 
event to producer before requesting a queue.");
+
+   // We should skip the notification if this channel is already 
released.
+   if (!isReleased.get()) {
+   partitionRequestClient.notifyCreditAvailable(this);
+   }
}
 
/**
-* Exclusive buffer is recycled to this input channel directly and it 
may trigger notify
-* credit to producer.
+* Exclusive buffer is recycled to this input channel directly and it 
may trigger return extra
+* floating buffer and notify increased credit to the producer.
 *
 * @param segment The exclusive segment of this channel.
 */
@Override
public void recycle(MemorySegment segment) {
-   synchronized (availableBuffers) {
-   // Important: the isReleased check should be inside the 
synchronized block.
-   // that way the segment can also be returned to global 
pool after added into
-   // the available queue during releasing all resources.
+   int numAddedBuffers;
+
+   synchronized (bufferQueue) {
+   // Important: check the isReleased state inside 
synchronized block, so there is no
+   // race condition when recycle and releaseAllResources 
running in parallel.
if (isReleased.get()) {
try {
-   
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+   
inputGate.returnExclusiveSegments(Collections.singletonList(segment));
return;
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
-   availableBuffers.add(new Buffer(segment, this));
+   numAddedBuffers = bufferQueue.addExclusiveBuffer(new 
Buffer(segment, this), numRequiredBuffers);
}
 
-   if (unannouncedCredit.getAndAdd(1) == 0) {
+   if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(1) == 0) 
{
notifyCreditAvailable();
}
}
 
public int getNumberOfAvailableBuffers() {
-   synchronized (availableBuffers) {
-   return availableBuffers.size();
+   synchronized (bufferQueue) {
+   return bufferQueue.getAvailableBufferSize();
}
}
 
+   @VisibleForTesting
+   public int getNumberOfRequiredBuffers() {
--- End diff --

this could be package-private (and then remove `@VisibleForTesting`)


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announceme

[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154338695
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
+
+   final int highWaterMark = 
channel.config().getWriteBufferHighWaterMark();
+   // Set the writer index to the high water mark to ensure that 
all bytes are written
+   // to the wire although the buffer is "empty".
+   
channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
+
+   // Enqueue the input channel on the condition of un-writable 
channel
+   inputChannel1.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
 
channel.runPendingTasks();
 
-   // Read the enqueued msg
-   Object msg1 = channel.readOutbound();
+   // The input channel will not notify credits via un-writable 
channel
+   assertFalse(channel.isWritable());
+   verify(inputChannel1, times(1)).getAndResetCredit();
--- End diff --

also add `assertNull(channel.readOutbound());`


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154315390
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -144,20 +147,19 @@ public void testReceiveBuffer() throws Exception {
inputGate.setBufferPool(bufferPool);
inputGate.assignExclusiveSegments(networkBufferPool, 2);
 
-   final int backlog = 2;
-   final BufferResponse bufferResponse = 
createBufferResponse(
-   inputChannel.requestBuffer(), 0, 
inputChannel.getInputChannelId(), backlog);
-
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
handler.addInputChannel(inputChannel);
 
+   final int backlog = 2;
+   final BufferResponse bufferResponse = 
createBufferResponse(
+   TestBufferFactory.createBuffer(32), 0, 
inputChannel.getInputChannelId(), backlog);
--- End diff --

good catch not getting the buffer from the channel here


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274371#comment-16274371
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154338695
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
+
+   final int highWaterMark = 
channel.config().getWriteBufferHighWaterMark();
+   // Set the writer index to the high water mark to ensure that 
all bytes are written
+   // to the wire although the buffer is "empty".
+   
channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
+
+   // Enqueue the input channel on the condition of un-writable 
channel
+   inputChannel1.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
 
channel.runPendingTasks();
 
-   // Read the enqueued msg
-   Object msg1 = channel.readOutbound();
+   // The input channel will not notify credits via un-writable 
channel
+   assertFalse(channel.isWritable());
+   verify(inputChannel1, times(1)).getAndResetCredit();
--- End diff --

also add `assertNull(channel.readOutbound());`


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274374#comment-16274374
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154336638
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
 ---
@@ -167,6 +167,13 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
});
}
 
+   public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
--- End diff --

this may be package-private


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154321327
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
+
+   final int highWaterMark = 
channel.config().getWriteBufferHighWaterMark();
+   // Set the writer index to the high water mark to ensure that 
all bytes are written
+   // to the wire although the buffer is "empty".
+   
channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
+
+   // Enqueue the input channel on the condition of un-writable 
channel
+   inputChannel1.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
 
channel.runPendingTasks();
 
-   // Read the enqueued msg
-   Object msg1 = channel.readOutbound();
+   // The input channel will not notify credits via un-writable 
channel
+   assertFalse(channel.isWritable());
+   verify(inputChannel1, times(1)).getAndResetCredit();
 
-   // Should notify credit
-   assertEquals(msg1.getClass(), AddCredit.class);
+   // Flush the buffer to make the channel writable again
+   channel.flush();
+
+   // The input channel should notify credits via triggering 
channel's writability changed event
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(2)).getAndResetCredit();
--- End diff --

also here (`AddCredit` as well as verifying the unannounced credit)


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274365#comment-16274365
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154316689
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -170,29 +172,20 @@ public void testReceiveBuffer() throws Exception {
 */
@Test
public void testThrowExceptionForNoAvailableBuffer() throws Exception {
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
-   
inputGate.setInputChannel(inputChannel.getPartitionId().getPartitionId(), 
inputChannel);
-   try {
-   inputGate.assignExclusiveSegments(networkBufferPool, 1);
-
-   final BufferResponse bufferResponse = 
createBufferResponse(
-   inputChannel.requestBuffer(), 0, 
inputChannel.getInputChannelId(), 2);
 
-   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   handler.addInputChannel(inputChannel);
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   handler.addInputChannel(inputChannel);
 
-   handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse);
+   assertEquals("There should be no buffers available in the 
channel.",
+   0, inputChannel.getNumberOfAvailableBuffers());
 
-   verify(inputChannel, 
times(1)).onError(any(IllegalStateException.class));
-   } finally {
-   // Release all the buffer resources
-   inputChannel.releaseAllResources();
+   final BufferResponse bufferResponse = createBufferResponse(
+   TestBufferFactory.createBuffer(), 0, 
inputChannel.getInputChannelId(), 2);
+   handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse);
 
-   networkBufferPool.destroyAllBufferPools();
-   networkBufferPool.destroy();
-   }
+   verify(inputChannel, 
times(1)).onError(any(IllegalStateException.class));
--- End diff --

nice test simplification


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r154321125
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception 
{
}
 
/**
-* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
-* {@link AddCredit} message is sent to the producer.
+* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline 
for notifying credits,
+* and verifies the behaviour of credit notification by triggering 
channel's writability changed.
 */
@Test
public void testNotifyCreditAvailable() throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate();
+   final RemoteInputChannel inputChannel1 = 
spy(createRemoteInputChannel(inputGate));
+   final RemoteInputChannel inputChannel2 = 
spy(createRemoteInputChannel(inputGate));
final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
-   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+   final EmbeddedChannel channel = spy(new 
EmbeddedChannel(handler));
 
-   final RemoteInputChannel inputChannel = 
createRemoteInputChannel(mock(SingleInputGate.class));
+   // Increase the credits to enqueue the input channels
+   inputChannel1.increaseCredit(1);
+   inputChannel2.increaseCredit(1);
+   handler.notifyCreditAvailable(inputChannel1);
+   handler.notifyCreditAvailable(inputChannel2);
 
-   // Enqueue the input channel
-   handler.notifyCreditAvailable(inputChannel);
+   channel.runPendingTasks();
+
+   // The two input channels should notify credits via writable 
channel
+   assertTrue(channel.isWritable());
+   assertEquals(channel.readOutbound().getClass(), 
AddCredit.class);
+   verify(inputChannel1, times(1)).getAndResetCredit();
+   verify(inputChannel2, times(1)).getAndResetCredit();
--- End diff --

Instead of verifying the method being called, we could add a (test-visible) 
`getUnannouncedCredit()` method and check the actual credit (since this new 
method would only read the state, it is ok to have it).


---


  1   2   >