[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275369#comment-16275369 ] ASF GitHub Bot commented on FLINK-6094: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/4471 hi @twalthr , Sorry for the late reply. The notification of github has been ignored mistakenly. I will give an update ASAP. Thanks very much. > Implement stream-stream proctime non-window inner join > --- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > This includes: > 1.Implement stream-stream proctime non-window inner join > 2.Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4471: [FLINK-6094] [table] Implement stream-stream proctime non...
Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/4471 hi @twalthr , Sorry for the late reply. The notification of github has been ignored mistakenly. I will give an update ASAP. Thanks very much. ---
[jira] [Commented] (FLINK-8141) Add AppendStreamTableSink for bucketed ORC files
[ https://issues.apache.org/jira/browse/FLINK-8141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275158#comment-16275158 ] Till Rohrmann commented on FLINK-8141: -- Sorry, I mixed up the JIRA numbers. The merged PR fixed FLINK-8184. > Add AppendStreamTableSink for bucketed ORC files > > > Key: FLINK-8141 > URL: https://issues.apache.org/jira/browse/FLINK-8141 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Priority: Minor > > It would be good to have an {{AppendStreamTableSink}} that writes to bucketed > ORC files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8141) Add AppendStreamTableSink for bucketed ORC files
[ https://issues.apache.org/jira/browse/FLINK-8141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275155#comment-16275155 ] ASF GitHub Bot commented on FLINK-8141: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5109 > Add AppendStreamTableSink for bucketed ORC files > > > Key: FLINK-8141 > URL: https://issues.apache.org/jira/browse/FLINK-8141 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Priority: Minor > > It would be good to have an {{AppendStreamTableSink}} that writes to bucketed > ORC files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8184) Return raw JsonPlan instead of escaped string value in JobDetailsInfo
[ https://issues.apache.org/jira/browse/FLINK-8184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8184. Resolution: Fixed Fixed via ee9027e491ff50bf72f51c869a5095aef7092396 > Return raw JsonPlan instead of escaped string value in JobDetailsInfo > - > > Key: FLINK-8184 > URL: https://issues.apache.org/jira/browse/FLINK-8184 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.5.0 > > > The {{JobDetailsInfo}} should pass the JsonPlan as a raw value because > otherwise the string value will be escaped. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5109: [FLINK-8141] [flip6] Fix JsonPlan serialization in...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5109 ---
[jira] [Commented] (FLINK-8141) Add AppendStreamTableSink for bucketed ORC files
[ https://issues.apache.org/jira/browse/FLINK-8141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275149#comment-16275149 ] ASF GitHub Bot commented on FLINK-8141: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5109 Merging this PR. > Add AppendStreamTableSink for bucketed ORC files > > > Key: FLINK-8141 > URL: https://issues.apache.org/jira/browse/FLINK-8141 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Priority: Minor > > It would be good to have an {{AppendStreamTableSink}} that writes to bucketed > ORC files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5109: [FLINK-8141] [flip6] Fix JsonPlan serialization in JobDet...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5109 Merging this PR. ---
[jira] [Commented] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
[ https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275136#comment-16275136 ] Tao Xia commented on FLINK-8173: thanks for the fix. it is working now with 1.4 > InvalidProgramException: Table program cannot be compiled. This is a bug. > Please file an issue. > --- > > Key: FLINK-8173 > URL: https://issues.apache.org/jira/browse/FLINK-8173 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Tao Xia >Assignee: Timo Walther > Fix For: 1.4.0, 1.5.0 > > > It is a stream of Avro objects, simply select a String field and trying to > print out > val query = "SELECT nd_key FROM table1" > val result = tableEnv.sql(query) > tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print() > 11/29/2017 16:07:36 Source: Custom Source -> from: (accepted_cohort_id, > admin_id, after_submission, amount_paid, anonymous_id, application_id, > atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, > concept_key, concept_rank, context, context_campaign, context_experiment, > coupon_code, course_key, course_rank, cta_destination, cta_location, > cta_message, cta_type, currency, decision_group_id, device_browser, > device_os, device_os_version, device_type, duration, evaluation_id, > event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, > lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, > module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, > notification_id, num_concepts_completed, num_interactions, > num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, > pause_reason, payment_plan, payment_provider, points_earned, points_possible, > price, price_sheet, product_key, product_type, provider_charge_id, > provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, > results, scholarship_group_key, search_term, skill_level, subscription_id, > suspension_length, suspension_reason, technology, timestamp, total_concepts, > total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, > user_response, variant, version, workspace_id, workspace_session, > workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) > switched to FAILED > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column > 15: Assignment conversion not possible from type "java.lang.CharSequence" to > type "org.apache.avro.util.Utf8" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534) > at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436) > a
[jira] [Commented] (FLINK-8030) Start JobMasterRestEndpoint in JobClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-8030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275117#comment-16275117 ] ASF GitHub Bot commented on FLINK-8030: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4988 Actually we cannot change it to `0.0.0.0`, because the address is also used by the client to connect to the rest server. Therefore, it must be a valid target address. > Start JobMasterRestEndpoint in JobClusterEntrypoint > --- > > Key: FLINK-8030 > URL: https://issues.apache.org/jira/browse/FLINK-8030 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > We should launch the {{JobMasterRestEndpoint}} in the > {{JobClusterEntrypoint}} in order to run the web frontend for per-job > clusters. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4988: [FLINK-8030] Instantiate JobMasterRestEndpoint in JobClus...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4988 Actually we cannot change it to `0.0.0.0`, because the address is also used by the client to connect to the rest server. Therefore, it must be a valid target address. ---
[jira] [Commented] (FLINK-8030) Start JobMasterRestEndpoint in JobClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-8030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275113#comment-16275113 ] ASF GitHub Bot commented on FLINK-8030: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4988 Thanks for the review @shuai-xu. I'll change the default value of the rest address to `0.0.0.0`. > Start JobMasterRestEndpoint in JobClusterEntrypoint > --- > > Key: FLINK-8030 > URL: https://issues.apache.org/jira/browse/FLINK-8030 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > We should launch the {{JobMasterRestEndpoint}} in the > {{JobClusterEntrypoint}} in order to run the web frontend for per-job > clusters. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4988: [FLINK-8030] Instantiate JobMasterRestEndpoint in JobClus...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4988 Thanks for the review @shuai-xu. I'll change the default value of the rest address to `0.0.0.0`. ---
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274913#comment-16274913 ] Sebastian Klemke commented on FLINK-8186: - [~twalthr] Thanks for looking into it. To summarize: Provided test program and pom fail with above mentioned exception if all of the following conditions are true: - test program is built using "build-jar" profile - Flink 1.4.0 RC2 hadoop27 runtime is used - standalone cluster - input dataset is non-empty In this case, rhs of https://github.com/apache/flink/blob/release-1.4/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java#L119 is loaded by org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader, comparison fails and else branch is evaluated, leading to ReflectDatumReader being used instead of GenericDatumReader. > AvroInputFormat regression: fails to deserialize GenericRecords on standalone > cluster with hadoop27 compat > -- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8189) move flink-statebackend-rocksdb out of flink-contrib
Bowen Li created FLINK-8189: --- Summary: move flink-statebackend-rocksdb out of flink-contrib Key: FLINK-8189 URL: https://issues.apache.org/jira/browse/FLINK-8189 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Affects Versions: 1.5.0 Reporter: Bowen Li Assignee: Bowen Li Move {{flink-statebackend-rocksdb}} into probably its own/state-backend module or {{flink-runtime}} package. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala
[ https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8175: Issue Type: Sub-task (was: Improvement) Parent: FLINK-8188 > remove flink-streaming-contrib and migrate its classes to > flink-streaming-java/scala > > > Key: FLINK-8175 > URL: https://issues.apache.org/jira/browse/FLINK-8175 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li > > I propose removing flink-streaming-contrib from flink-contrib, and migrating > its classes to flink-streaming-java/scala for the following reasons: > - flink-streaming-contrib is so small that it only has 4 classes (3 java and > 1 scala), and they don't need a dedicated jar for Flink to distribute and > maintain it and for users to deal with the overhead of dependency management > - the 4 classes in flink-streaming-contrib are logically more tied to > flink-streaming-java/scala, and thus can be easily migrated > - flink-contrib is already too crowded and noisy. It contains lots of sub > modules with different purposes which confuse developers and users, and they > lack a proper project hierarchy > To take a step even forward, I would argue that even flink-contrib should be > removed and all its submodules should be migrated to other top-level modules > for the following reasons: 1) Apache Flink the whole project itself is a > result of contributions from many developers, there's no reason to highlight > some contributions in a dedicated module named 'contrib' 2) flink-contrib > inherently doesn't have a good hierarchy to hold submodules -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala
[ https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-8175: --- Assignee: Bowen Li > remove flink-streaming-contrib and migrate its classes to > flink-streaming-java/scala > > > Key: FLINK-8175 > URL: https://issues.apache.org/jira/browse/FLINK-8175 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li > > I propose removing flink-streaming-contrib from flink-contrib, and migrating > its classes to flink-streaming-java/scala for the following reasons: > - flink-streaming-contrib is so small that it only has 4 classes (3 java and > 1 scala), and they don't need a dedicated jar for Flink to distribute and > maintain it and for users to deal with the overhead of dependency management > - the 4 classes in flink-streaming-contrib are logically more tied to > flink-streaming-java/scala, and thus can be easily migrated > - flink-contrib is already too crowded and noisy. It contains lots of sub > modules with different purposes which confuse developers and users, and they > lack a proper project hierarchy > To take a step even forward, I would argue that even flink-contrib should be > removed and all its submodules should be migrated to other top-level modules > for the following reasons: 1) Apache Flink the whole project itself is a > result of contributions from many developers, there's no reason to highlight > some contributions in a dedicated module named 'contrib' 2) flink-contrib > inherently doesn't have a good hierarchy to hold submodules -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8188) Clean up flink-contrib
Bowen Li created FLINK-8188: --- Summary: Clean up flink-contrib Key: FLINK-8188 URL: https://issues.apache.org/jira/browse/FLINK-8188 Project: Flink Issue Type: Improvement Affects Versions: 1.5.0 Reporter: Bowen Li This is the umbrella ticket for cleaning up flink-contrib. We argue that flink-contrib should be removed and all its submodules should be migrated to other top-level modules for the following reasons: 1) Apache Flink the whole project itself is a result of contributions from many developers, there's no reason to highlight some contributions in a dedicated module named 'contrib' 2) flink-contrib is already too crowded and noisy. It contains lots of sub modules with different purposes which confuse developers and users, and they lack a proper project hierarchy 3) This will save us quite some build time More details in discussions at FLINK-8175 and FLINK-8167 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7294) mesos.resourcemanager.framework.role not working
[ https://issues.apache.org/jira/browse/FLINK-7294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274705#comment-16274705 ] Eron Wright commented on FLINK-7294: - [~bbayani] I too thought it would be fine but upon further investigation found that `Resource::setRole` actually means, 'draw the resources from a reservation associated with role such-and-such'. In fact, `setRole` is deprecated in favor of a more elaborate reservations structure. We're now looking for a fix that will cover all scenarios. Feel free to email me directly to discuss this further. > mesos.resourcemanager.framework.role not working > > > Key: FLINK-7294 > URL: https://issues.apache.org/jira/browse/FLINK-7294 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.3.1 >Reporter: Bhumika Bayani >Assignee: Eron Wright >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > I am using the above said setting in flink-conf.yaml > e.g. > mesos.resourcemanager.framework.role: mesos_role_tasks > I see a flink-scheduler registered in mesos/frameworks tab with above said > role. > But the scheduler fails to launch any tasks inspite of getting > resource-offers from mesos-agents with correct role. > The error seen is: > {code} > 2017-07-28 13:23:00,683 INFO > org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager - > Mesos task taskmanager-03768 failed, with a TaskManager in launch or > registration. State: TASK_ERROR Reason: REASON_TASK_INVALID (Task uses more > resources cpus(\*):1; mem(\*):1024; ports(\*):[4006-4007] than available > cpus(mesos_role_tasks):7.4; mem(mesos_role_tasks):45876; > ports(mesos_role_tasks):[4002-4129, 4131-4380, 4382-4809, 4811-4957, > 4959-4966, 4968-4979, 4981-5049, 31000-31196, 31198-31431, 31433-31607, > 31609-32000]; ephemeral_storage(mesos_role_tasks):37662; > efs_storage(mesos_role_tasks):8.79609e+12; disk(mesos_role_tasks):5115) > {code} > The request is made for resources with * role. We do not have mesos running > anywhere with * role. Thus task manager never come up. > Am I missing any configuration? > I am using flink version 1.3.1 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274656#comment-16274656 ] Sebastian Klemke commented on FLINK-8186: - With -Pbuild-jar, the job fat-jar includes avro, without it doesn't. I added org.apache.flink:flink-avro:${flink.version} to "provided" dependency list: Now it works in standalone cluster and the jar only contains GenericRecordCount class. Have to re-test on YARN and LocalExecutionEnvironment. > AvroInputFormat regression: fails to deserialize GenericRecords on standalone > cluster with hadoop27 compat > -- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274641#comment-16274641 ] Timo Walther commented on FLINK-8186: - Thanks, I will look into it again on Monday. > AvroInputFormat regression: fails to deserialize GenericRecords on standalone > cluster with hadoop27 compat > -- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
[ https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-8173: Component/s: Table API & SQL > InvalidProgramException: Table program cannot be compiled. This is a bug. > Please file an issue. > --- > > Key: FLINK-8173 > URL: https://issues.apache.org/jira/browse/FLINK-8173 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Tao Xia >Assignee: Timo Walther > Fix For: 1.4.0, 1.5.0 > > > It is a stream of Avro objects, simply select a String field and trying to > print out > val query = "SELECT nd_key FROM table1" > val result = tableEnv.sql(query) > tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print() > 11/29/2017 16:07:36 Source: Custom Source -> from: (accepted_cohort_id, > admin_id, after_submission, amount_paid, anonymous_id, application_id, > atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, > concept_key, concept_rank, context, context_campaign, context_experiment, > coupon_code, course_key, course_rank, cta_destination, cta_location, > cta_message, cta_type, currency, decision_group_id, device_browser, > device_os, device_os_version, device_type, duration, evaluation_id, > event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, > lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, > module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, > notification_id, num_concepts_completed, num_interactions, > num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, > pause_reason, payment_plan, payment_provider, points_earned, points_possible, > price, price_sheet, product_key, product_type, provider_charge_id, > provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, > results, scholarship_group_key, search_term, skill_level, subscription_id, > suspension_length, suspension_reason, technology, timestamp, total_concepts, > total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, > user_response, variant, version, workspace_id, workspace_session, > workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) > switched to FAILED > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column > 15: Assignment conversion not possible from type "java.lang.CharSequence" to > type "org.apache.avro.util.Utf8" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534) > at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.jav
[jira] [Resolved] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
[ https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8173. - Resolution: Fixed Fix Version/s: 1.5.0 1.4.0 Fixed in 1.5: 6aced1251878a4a98f66732d14336f3c86ec9d98 Fixed in 1.4: 6ca2be3ffbf3f8a92c21b0990e317ff3a60a3e82 > InvalidProgramException: Table program cannot be compiled. This is a bug. > Please file an issue. > --- > > Key: FLINK-8173 > URL: https://issues.apache.org/jira/browse/FLINK-8173 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Tao Xia >Assignee: Timo Walther > Fix For: 1.4.0, 1.5.0 > > > It is a stream of Avro objects, simply select a String field and trying to > print out > val query = "SELECT nd_key FROM table1" > val result = tableEnv.sql(query) > tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print() > 11/29/2017 16:07:36 Source: Custom Source -> from: (accepted_cohort_id, > admin_id, after_submission, amount_paid, anonymous_id, application_id, > atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, > concept_key, concept_rank, context, context_campaign, context_experiment, > coupon_code, course_key, course_rank, cta_destination, cta_location, > cta_message, cta_type, currency, decision_group_id, device_browser, > device_os, device_os_version, device_type, duration, evaluation_id, > event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, > lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, > module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, > notification_id, num_concepts_completed, num_interactions, > num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, > pause_reason, payment_plan, payment_provider, points_earned, points_possible, > price, price_sheet, product_key, product_type, provider_charge_id, > provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, > results, scholarship_group_key, search_term, skill_level, subscription_id, > suspension_length, suspension_reason, technology, timestamp, total_concepts, > total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, > user_response, variant, version, workspace_id, workspace_session, > workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) > switched to FAILED > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column > 15: Assignment conversion not possible from type "java.lang.CharSequence" to > type "org.apache.avro.util.Utf8" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534) > at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.ja
[jira] [Updated] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
[ https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-8173: Affects Version/s: 1.4.0 > InvalidProgramException: Table program cannot be compiled. This is a bug. > Please file an issue. > --- > > Key: FLINK-8173 > URL: https://issues.apache.org/jira/browse/FLINK-8173 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Tao Xia >Assignee: Timo Walther > Fix For: 1.4.0, 1.5.0 > > > It is a stream of Avro objects, simply select a String field and trying to > print out > val query = "SELECT nd_key FROM table1" > val result = tableEnv.sql(query) > tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print() > 11/29/2017 16:07:36 Source: Custom Source -> from: (accepted_cohort_id, > admin_id, after_submission, amount_paid, anonymous_id, application_id, > atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, > concept_key, concept_rank, context, context_campaign, context_experiment, > coupon_code, course_key, course_rank, cta_destination, cta_location, > cta_message, cta_type, currency, decision_group_id, device_browser, > device_os, device_os_version, device_type, duration, evaluation_id, > event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, > lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, > module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, > notification_id, num_concepts_completed, num_interactions, > num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, > pause_reason, payment_plan, payment_provider, points_earned, points_possible, > price, price_sheet, product_key, product_type, provider_charge_id, > provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, > results, scholarship_group_key, search_term, skill_level, subscription_id, > suspension_length, suspension_reason, technology, timestamp, total_concepts, > total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, > user_response, variant, version, workspace_id, workspace_session, > workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) > switched to FAILED > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column > 15: Assignment conversion not possible from type "java.lang.CharSequence" to > type "org.apache.avro.util.Utf8" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534) > at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:21
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274628#comment-16274628 ] Sebastian Klemke commented on FLINK-8186: - The error only occurs if job jar was built with -Pbuild-jar. It also occurs on testdata.avro from Flink repo. > AvroInputFormat regression: fails to deserialize GenericRecords on standalone > cluster with hadoop27 compat > -- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
[ https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274613#comment-16274613 ] ASF GitHub Bot commented on FLINK-8173: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5111 > InvalidProgramException: Table program cannot be compiled. This is a bug. > Please file an issue. > --- > > Key: FLINK-8173 > URL: https://issues.apache.org/jira/browse/FLINK-8173 > Project: Flink > Issue Type: Bug >Reporter: Tao Xia >Assignee: Timo Walther > > It is a stream of Avro objects, simply select a String field and trying to > print out > val query = "SELECT nd_key FROM table1" > val result = tableEnv.sql(query) > tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print() > 11/29/2017 16:07:36 Source: Custom Source -> from: (accepted_cohort_id, > admin_id, after_submission, amount_paid, anonymous_id, application_id, > atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, > concept_key, concept_rank, context, context_campaign, context_experiment, > coupon_code, course_key, course_rank, cta_destination, cta_location, > cta_message, cta_type, currency, decision_group_id, device_browser, > device_os, device_os_version, device_type, duration, evaluation_id, > event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, > lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, > module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, > notification_id, num_concepts_completed, num_interactions, > num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, > pause_reason, payment_plan, payment_provider, points_earned, points_possible, > price, price_sheet, product_key, product_type, provider_charge_id, > provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, > results, scholarship_group_key, search_term, skill_level, subscription_id, > suspension_length, suspension_reason, technology, timestamp, total_concepts, > total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, > user_response, variant, version, workspace_id, workspace_session, > workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) > switched to FAILED > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column > 15: Assignment conversion not possible from type "java.lang.CharSequence" to > type "org.apache.avro.util.Utf8" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534) > at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436) > at org.codehaus.janino.UnitCompiler.access$40
[GitHub] flink pull request #5111: [FLINK-8173] [table] Fix input unboxing and suppor...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5111 ---
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274609#comment-16274609 ] Sebastian Klemke commented on FLINK-8186: - Regarding binaries: Yes, this is the runtime I'm using. I'm compiling against 1.4 snapshot via maven. And for the avro files: I'll test with https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/resources/testdata.avro > AvroInputFormat regression: fails to deserialize GenericRecords on standalone > cluster with hadoop27 compat > -- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8151) [Table] Map equality check to use entrySet equality
[ https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8151. - Resolution: Fixed Fix Version/s: 1.5.0 Fixed in 1.5: deea4b32bab4160e2279e9f57d7272df2d91e434 > [Table] Map equality check to use entrySet equality > --- > > Key: FLINK-8151 > URL: https://issues.apache.org/jira/browse/FLINK-8151 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > Fix For: 1.5.0 > > > Following up with FLINK-8038. The equality check currently is broken. Plan to > support element-wise equality check by always using the base class: > "java.util.Map.equals" method. > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274601#comment-16274601 ] Sebastian Klemke commented on FLINK-8186: - What puzzles me is org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) in the stack trace. It should be GenericData.newRecord(). > AvroInputFormat regression: fails to deserialize GenericRecords on standalone > cluster with hadoop27 compat > -- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274596#comment-16274596 ] Christos Hadjinikolis commented on FLINK-5506: -- Just to note that I have tried running the algorithm with different graph types, e.g. using Graph but I keep hitting against the same error message. Looking into the source code I can see that necessary precautions are taken to guarantee that: {{labelsWithHighestScore.get(maxScoreLabel);}} won't be null. So, this possibly suggests that it's an issue concerned with asychronous calls to the thread, i.e. the getter is somehow called prior initialisation of {{labelsWithHighestScoret}}. Hope this helps, Christos > Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException > - > > Key: FLINK-5506 > URL: https://issues.apache.org/jira/browse/FLINK-5506 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.1.4, 1.3.2, 1.4.1 >Reporter: Miguel E. Coimbra >Assignee: Greg Hogan > Labels: easyfix, newbie > Original Estimate: 2h > Remaining Estimate: 2h > > Reporting this here as per Vasia's advice. > I am having the following problem while trying out the > org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API > (Java). > Specs: JDK 1.8.0_102 x64 > Apache Flink: 1.1.4 > Suppose I have a very small (I tried an example with 38 vertices as well) > dataset stored in a tab-separated file 3-vertex.tsv: > {code} > #id1 id2 score > 010 > 020 > 030 > {code} > This is just a central vertex with 3 neighbors (disconnected between > themselves). > I am loading the dataset and executing the algorithm with the following code: > {code} > // Load the data from the .tsv file. > final DataSet> edgeTuples = > env.readCsvFile(inputPath) > .fieldDelimiter("\t") // node IDs are separated by spaces > .ignoreComments("#") // comments start with "%" > .types(Long.class, Long.class, Double.class); > // Generate a graph and add reverse edges (undirected). > final Graph graph = Graph.fromTupleDataSet(edgeTuples, > new MapFunction() { > private static final long serialVersionUID = 8713516577419451509L; > public Long map(Long value) { > return value; > } > }, > env).getUndirected(); > // CommunityDetection parameters. > final double hopAttenuationDelta = 0.5d; > final int iterationCount = 10; > // Prepare and trigger the execution. > DataSet> vs = graph.run(new > org.apache.flink.graph.library.CommunityDetection(iterationCount, > hopAttenuationDelta)).getVertices(); > vs.print(); > {code} > Running this code throws the following exception (check the bold line): > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NullPointerException > at > org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158) > at > org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > at > org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > at > org.apache.flink.runtime.iterative.task.Itera
[jira] [Commented] (FLINK-8151) [Table] Map equality check to use entrySet equality
[ https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274595#comment-16274595 ] ASF GitHub Bot commented on FLINK-8151: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5070 > [Table] Map equality check to use entrySet equality > --- > > Key: FLINK-8151 > URL: https://issues.apache.org/jira/browse/FLINK-8151 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Following up with FLINK-8038. The equality check currently is broken. Plan to > support element-wise equality check by always using the base class: > "java.util.Map.equals" method. > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5070: [FLINK-8151][table]Map equality check to use entry...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5070 ---
[jira] [Commented] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
[ https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274591#comment-16274591 ] ASF GitHub Bot commented on FLINK-8173: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5111 Thanks for the fix. +1 to merge > InvalidProgramException: Table program cannot be compiled. This is a bug. > Please file an issue. > --- > > Key: FLINK-8173 > URL: https://issues.apache.org/jira/browse/FLINK-8173 > Project: Flink > Issue Type: Bug >Reporter: Tao Xia >Assignee: Timo Walther > > It is a stream of Avro objects, simply select a String field and trying to > print out > val query = "SELECT nd_key FROM table1" > val result = tableEnv.sql(query) > tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print() > 11/29/2017 16:07:36 Source: Custom Source -> from: (accepted_cohort_id, > admin_id, after_submission, amount_paid, anonymous_id, application_id, > atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, > concept_key, concept_rank, context, context_campaign, context_experiment, > coupon_code, course_key, course_rank, cta_destination, cta_location, > cta_message, cta_type, currency, decision_group_id, device_browser, > device_os, device_os_version, device_type, duration, evaluation_id, > event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, > lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, > module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, > notification_id, num_concepts_completed, num_interactions, > num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, > pause_reason, payment_plan, payment_provider, points_earned, points_possible, > price, price_sheet, product_key, product_type, provider_charge_id, > provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, > results, scholarship_group_key, search_term, skill_level, subscription_id, > suspension_length, suspension_reason, technology, timestamp, total_concepts, > total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, > user_response, variant, version, workspace_id, workspace_session, > workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) > switched to FAILED > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: Line 790, Column > 15: Assignment conversion not possible from type "java.lang.CharSequence" to > type "org.apache.avro.util.Utf8" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2534) > at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1459) > at > org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3348) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436) > at org.co
[GitHub] flink issue #5111: [FLINK-8173] [table] Fix input unboxing and support Avro ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5111 Thanks for the fix. +1 to merge ---
[jira] [Commented] (FLINK-8151) [Table] Map equality check to use entrySet equality
[ https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274580#comment-16274580 ] ASF GitHub Bot commented on FLINK-8151: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5070 @walterddr I think the current solution is sufficient for now. I will merge this... > [Table] Map equality check to use entrySet equality > --- > > Key: FLINK-8151 > URL: https://issues.apache.org/jira/browse/FLINK-8151 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Following up with FLINK-8038. The equality check currently is broken. Plan to > support element-wise equality check by always using the base class: > "java.util.Map.equals" method. > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5070: [FLINK-8151][table]Map equality check to use entrySet equ...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5070 @walterddr I think the current solution is sufficient for now. I will merge this... ---
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274567#comment-16274567 ] Timo Walther commented on FLINK-8186: - Is it possible that the error is only reproducible with your Avro files? > AvroInputFormat regression: fails to deserialize GenericRecords on standalone > cluster with hadoop27 compat > -- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274565#comment-16274565 ] Timo Walther commented on FLINK-8186: - Actually, the error is also very weird because {{GenericRecord}} is an interface with no constructor and actually this record is therefore serialized with Kryo. > AvroInputFormat regression: fails to deserialize GenericRecords on standalone > cluster with hadoop27 compat > -- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274561#comment-16274561 ] Timo Walther commented on FLINK-8186: - I'm sorry but I cannot reproduce your error. Are you using these binaries? http://people.apache.org/~aljoscha/flink-1.4.0-rc2/flink-1.4.0-bin-hadoop27-scala_2.11.tgz > AvroInputFormat regression: fails to deserialize GenericRecords on standalone > cluster with hadoop27 compat > -- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sebastian Klemke updated FLINK-8186: Summary: AvroInputFormat regression: fails to deserialize GenericRecords on standalone cluster with hadoop27 compat (was: AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat) > AvroInputFormat regression: fails to deserialize GenericRecords on standalone > cluster with hadoop27 compat > -- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7574) Remove unused dependencies from flink-clients
[ https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274516#comment-16274516 ] ASF GitHub Bot commented on FLINK-7574: --- Github user yew1eb commented on the issue: https://github.com/apache/flink/pull/5076 @zentol , Thanks for your suggestions, i will be very careful to clean up the dependencies. I will start with the top modules (not used by other modules), order as follow: > ├── flink-clients > ├── flink-scala-shell > ├── flink-end-to-end-tests > ├── flink-fs-tests > ├── flink-mesos > ├── flink-yarn-tests > ├── flink-yarn > ├── flink-formats > │ ├── flink-avro > ├── flink-java8 > ├── flink-runtime-web > ├── flink-examples > │ ├── flink-examples-batch > │ ├── flink-examples-streaming > │ ├── flink-examples-table > ├── flink-libraries > │ ├── flink-cep > │ ├── flink-cep-scala > │ ├── flink-gelly > │ ├── flink-gelly-examples > │ ├── flink-gelly-scala > │ ├── flink-ml > │ ├── flink-python > │ ├── flink-table > ├── flink-metrics > │ ├── flink-metrics-core > │ ├── flink-metrics-datadog > │ ├── flink-metrics-dropwizard > │ ├── flink-metrics-ganglia > │ ├── flink-metrics-graphite > │ ├── flink-metrics-jmx > │ ├── flink-metrics-prometheus > │ ├── flink-metrics-slf4j > │ ├── flink-metrics-statsd > ├── flink-contrib > │ ├── flink-connector-wikiedits > │ ├── flink-statebackend-rocksdb > │ ├── flink-storm > │ ├── flink-storm-examples > │ ├── flink-streaming-contrib > │ ├── flink-tweet-inputformat > ├── flink-filesystems > │ ├── flink-hadoop-fs > │ ├── flink-mapr-fs > │ ├── flink-s3-fs-hadoop > │ ├── flink-s3-fs-presto > ├── flink-connectors > │ ├── flink-connector-cassandra > │ ├── flink-connector-elasticsearch > │ ├── flink-connector-elasticsearch-base > │ ├── flink-connector-elasticsearch2 > │ ├── flink-connector-elasticsearch5 > │ ├── flink-connector-filesystem > │ ├── flink-connector-kafka-0.10 > │ ├── flink-connector-kafka-0.11 > │ ├── flink-connector-kafka-0.8 > │ ├── flink-connector-kafka-0.9 > │ ├── flink-connector-kafka-base > │ ├── flink-connector-kinesis > │ ├── flink-connector-nifi > │ ├── flink-connector-rabbitmq > │ ├── flink-connector-twitter > │ ├── flink-hadoop-compatibility > │ ├── flink-hbase > │ ├── flink-hcatalog > │ ├── flink-jdbc > │ ├── flink-orc > ├── flink-optimizer > ├── flink-queryable-state > ├── flink-test-utils-parent > │ ├── flink-test-utils > │ ├── flink-test-utils-junit > ├── flink-tests > ├── flink-scala > ├── flink-streaming-java > ├── flink-streaming-scala > ├── flink-runtime > ├── flink-java > ├── flink-core > Remove unused dependencies from flink-clients > - > > Key: FLINK-7574 > URL: https://issues.apache.org/jira/browse/FLINK-7574 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.3.2 > Environment: Apache Maven 3.3.9, Java version: 1.8.0_144 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > > [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ > flink-clients_2.11 --- > [WARNING] Used undeclared dependencies found: > [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile > [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile > [WARNING] Unused declared dependencies found: > [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test > [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile > [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test > [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile > [WARNING]log4j:log4j:jar:1.2.17:test > [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test > [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5076: [FLINK-7574][build] POM Cleanup flink-clients
Github user yew1eb commented on the issue: https://github.com/apache/flink/pull/5076 @zentol , Thanks for your suggestions, i will be very careful to clean up the dependencies. I will start with the top modules (not used by other modules), order as follow: > âââ flink-clients > âââ flink-scala-shell > âââ flink-end-to-end-tests > âââ flink-fs-tests > âââ flink-mesos > âââ flink-yarn-tests > âââ flink-yarn > âââ flink-formats > â  âââ flink-avro > âââ flink-java8 > âââ flink-runtime-web > âââ flink-examples > â  âââ flink-examples-batch > â  âââ flink-examples-streaming > â  âââ flink-examples-table > âââ flink-libraries > â  âââ flink-cep > â  âââ flink-cep-scala > â  âââ flink-gelly > â  âââ flink-gelly-examples > â  âââ flink-gelly-scala > â  âââ flink-ml > â  âââ flink-python > â  âââ flink-table > âââ flink-metrics > â  âââ flink-metrics-core > â  âââ flink-metrics-datadog > â  âââ flink-metrics-dropwizard > â  âââ flink-metrics-ganglia > â  âââ flink-metrics-graphite > â  âââ flink-metrics-jmx > â  âââ flink-metrics-prometheus > â  âââ flink-metrics-slf4j > â  âââ flink-metrics-statsd > âââ flink-contrib > â  âââ flink-connector-wikiedits > â  âââ flink-statebackend-rocksdb > â  âââ flink-storm > â  âââ flink-storm-examples > â  âââ flink-streaming-contrib > â  âââ flink-tweet-inputformat > âââ flink-filesystems > â  âââ flink-hadoop-fs > â  âââ flink-mapr-fs > â  âââ flink-s3-fs-hadoop > â  âââ flink-s3-fs-presto > âââ flink-connectors > â  âââ flink-connector-cassandra > â  âââ flink-connector-elasticsearch > â  âââ flink-connector-elasticsearch-base > â  âââ flink-connector-elasticsearch2 > â  âââ flink-connector-elasticsearch5 > â  âââ flink-connector-filesystem > â  âââ flink-connector-kafka-0.10 > â  âââ flink-connector-kafka-0.11 > â  âââ flink-connector-kafka-0.8 > â  âââ flink-connector-kafka-0.9 > â  âââ flink-connector-kafka-base > â  âââ flink-connector-kinesis > â  âââ flink-connector-nifi > â  âââ flink-connector-rabbitmq > â  âââ flink-connector-twitter > â  âââ flink-hadoop-compatibility > â  âââ flink-hbase > â  âââ flink-hcatalog > â  âââ flink-jdbc > â  âââ flink-orc > âââ flink-optimizer > âââ flink-queryable-state > âââ flink-test-utils-parent > â  âââ flink-test-utils > â  âââ flink-test-utils-junit > âââ flink-tests > âââ flink-scala > âââ flink-streaming-java > âââ flink-streaming-scala > âââ flink-runtime > âââ flink-java > âââ flink-core ---
[jira] [Created] (FLINK-8187) Web client does not print errors
Timo Walther created FLINK-8187: --- Summary: Web client does not print errors Key: FLINK-8187 URL: https://issues.apache.org/jira/browse/FLINK-8187 Project: Flink Issue Type: Bug Components: Webfrontend Reporter: Timo Walther When submitting a jar with no defined Main class, the web client does not respond anymore and instead of printing the REST error: {code} java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkException: Could not run the jar. ... 9 more Caused by: org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file. at org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:592) at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:188) at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:147) at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:72) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69) ... 8 more {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274503#comment-16274503 ] Sebastian Klemke commented on FLINK-8186: - Invocation: {code} flink-1.4.0/bin/jobmanager.sh start cluster flink-1.4.0/bin/taskmanager.sh start flink-1.4.0/bin/taskmanager.sh start flink-1.4.0/bin/taskmanager.sh start flink-1.4.0/bin/taskmanager.sh start flink-1.4.0/bin/flink run -p 4 -c de.nerdheim.flink.GenericRecordCount target/generic-record-count-1.0-SNAPSHOT.jar --input {code} > AvroInputFormat regression: fails to deserialize GenericRecords on cluster > with hadoop27 compat > --- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8181) Kafka producer's FlinkFixedPartitioner returns different partitions when a target topic is rescaled
[ https://issues.apache.org/jira/browse/FLINK-8181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274500#comment-16274500 ] ASF GitHub Bot commented on FLINK-8181: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5108#discussion_r154368266 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java --- @@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, String targetTopic, int partitions != null && partitions.length > 0, "Partitions of the target topic is empty."); - return partitions[parallelInstanceId % partitions.length]; + if (topicToFixedPartition.containsKey(targetTopic)) { --- End diff -- @aljoscha yes, the semantics is a bit odd / needs some clarification before we move on. I've been having a go at implementing state checkpointing for the `FlinkFixedPartitioner` today, and for example one unclear case I bumped into was the following: Subtask 1 writes to partition X for "some-topic" Subtask 2 writes to partition Y for "some-topic" On restore and say the sink is rescaled to DOP of 1, should the single subtask continue writing to partition X or Y for "some-topic"? Regarding the default Kafka behaviour: It's hash partitioning on the attached key for the records. I've also thought about using that as the default instead of the fixed partitioner; see the relevant discussion here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaProducerXX-td16951.html > Kafka producer's FlinkFixedPartitioner returns different partitions when a > target topic is rescaled > --- > > Key: FLINK-8181 > URL: https://issues.apache.org/jira/browse/FLINK-8181 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0, 1.5.0 > > > On fixing FLINK-6288 and migrating the original {{FlinkFixedPartitioner}} to > the new partitioning API (commit 9ed9b68397b51bfd2b0f6e532212a82f771641bd), > the {{FlinkFixedPartitioner}} no longer returns identical target partitions > once a target topic is rescaled. > This results in a behavioral regression when the {{FlinkFixedPartitioner}} is > used. > The {{FlinkFixedPartitionerTest}} should also be strengthened to cover the > target topic rescaling case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5108: [FLINK-8181] [kafka] Make FlinkFixedPartitioner in...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5108#discussion_r154368266 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java --- @@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, String targetTopic, int partitions != null && partitions.length > 0, "Partitions of the target topic is empty."); - return partitions[parallelInstanceId % partitions.length]; + if (topicToFixedPartition.containsKey(targetTopic)) { --- End diff -- @aljoscha yes, the semantics is a bit odd / needs some clarification before we move on. I've been having a go at implementing state checkpointing for the `FlinkFixedPartitioner` today, and for example one unclear case I bumped into was the following: Subtask 1 writes to partition X for "some-topic" Subtask 2 writes to partition Y for "some-topic" On restore and say the sink is rescaled to DOP of 1, should the single subtask continue writing to partition X or Y for "some-topic"? Regarding the default Kafka behaviour: It's hash partitioning on the attached key for the records. I've also thought about using that as the default instead of the fixed partitioner; see the relevant discussion here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaProducerXX-td16951.html ---
[jira] [Updated] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sebastian Klemke updated FLINK-8186: Attachment: pom.xml GenericRecordCount.java > AvroInputFormat regression: fails to deserialize GenericRecords on cluster > with hadoop27 compat > --- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > Attachments: GenericRecordCount.java, pom.xml > > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274497#comment-16274497 ] Sebastian Klemke commented on FLINK-8186: - Hey [~twalthr], I used the flink-quickstart-java and added flink-clients_2.11 and flink-avro as dependencies. It runs on YARN, though. So maybe it's a documentation issue? Does it make sense to use hadoop27 flavour for a standalone cluster? I'll attach pom.xml + java source. > AvroInputFormat regression: fails to deserialize GenericRecords on cluster > with hadoop27 compat > --- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8181) Kafka producer's FlinkFixedPartitioner returns different partitions when a target topic is rescaled
[ https://issues.apache.org/jira/browse/FLINK-8181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274482#comment-16274482 ] ASF GitHub Bot commented on FLINK-8181: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5108#discussion_r154363656 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java --- @@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, String targetTopic, int partitions != null && partitions.length > 0, "Partitions of the target topic is empty."); - return partitions[parallelInstanceId % partitions.length]; + if (topicToFixedPartition.containsKey(targetTopic)) { --- End diff -- It might be that the "fixed partitioner" is hard to implement or that its semantics are somewhat strange. Not sure though. What's the default Kafka behaviour? Maybe we should also have that as the default behaviour. > Kafka producer's FlinkFixedPartitioner returns different partitions when a > target topic is rescaled > --- > > Key: FLINK-8181 > URL: https://issues.apache.org/jira/browse/FLINK-8181 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0, 1.5.0 > > > On fixing FLINK-6288 and migrating the original {{FlinkFixedPartitioner}} to > the new partitioning API (commit 9ed9b68397b51bfd2b0f6e532212a82f771641bd), > the {{FlinkFixedPartitioner}} no longer returns identical target partitions > once a target topic is rescaled. > This results in a behavioral regression when the {{FlinkFixedPartitioner}} is > used. > The {{FlinkFixedPartitionerTest}} should also be strengthened to cover the > target topic rescaling case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5108: [FLINK-8181] [kafka] Make FlinkFixedPartitioner in...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5108#discussion_r154363656 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java --- @@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, String targetTopic, int partitions != null && partitions.length > 0, "Partitions of the target topic is empty."); - return partitions[parallelInstanceId % partitions.length]; + if (topicToFixedPartition.containsKey(targetTopic)) { --- End diff -- It might be that the "fixed partitioner" is hard to implement or that its semantics are somewhat strange. Not sure though. What's the default Kafka behaviour? Maybe we should also have that as the default behaviour. ---
[jira] [Commented] (FLINK-8181) Kafka producer's FlinkFixedPartitioner returns different partitions when a target topic is rescaled
[ https://issues.apache.org/jira/browse/FLINK-8181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274480#comment-16274480 ] ASF GitHub Bot commented on FLINK-8181: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5108#discussion_r154363505 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java --- @@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, String targetTopic, int partitions != null && partitions.length > 0, "Partitions of the target topic is empty."); - return partitions[parallelInstanceId % partitions.length]; + if (topicToFixedPartition.containsKey(targetTopic)) { --- End diff -- Maybe yes, but I don't think we can do it this quickly now. Adding stateful stuff has big implications and we should make sure to understand them all. > Kafka producer's FlinkFixedPartitioner returns different partitions when a > target topic is rescaled > --- > > Key: FLINK-8181 > URL: https://issues.apache.org/jira/browse/FLINK-8181 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0, 1.5.0 > > > On fixing FLINK-6288 and migrating the original {{FlinkFixedPartitioner}} to > the new partitioning API (commit 9ed9b68397b51bfd2b0f6e532212a82f771641bd), > the {{FlinkFixedPartitioner}} no longer returns identical target partitions > once a target topic is rescaled. > This results in a behavioral regression when the {{FlinkFixedPartitioner}} is > used. > The {{FlinkFixedPartitionerTest}} should also be strengthened to cover the > target topic rescaling case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5108: [FLINK-8181] [kafka] Make FlinkFixedPartitioner in...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5108#discussion_r154363505 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java --- @@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, String targetTopic, int partitions != null && partitions.length > 0, "Partitions of the target topic is empty."); - return partitions[parallelInstanceId % partitions.length]; + if (topicToFixedPartition.containsKey(targetTopic)) { --- End diff -- Maybe yes, but I don't think we can do it this quickly now. Adding stateful stuff has big implications and we should make sure to understand them all. ---
[jira] [Commented] (FLINK-8176) Dispatcher does not start SubmittedJobGraphStore
[ https://issues.apache.org/jira/browse/FLINK-8176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274468#comment-16274468 ] ASF GitHub Bot commented on FLINK-8176: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r154359352 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception { heartbeatServices, mock(MetricRegistryImpl.class), fatalErrorHandler, - jobManagerRunner, - jobId); + mockJobManagerRunner, + TEST_JOB_ID); - try { - dispatcher.start(); + dispatcher.start(); + } - CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); + @After + public void tearDown() throws Exception { + try { + fatalErrorHandler.rethrowError(); + } finally { + RpcUtils.terminateRpcEndpoint(dispatcher, timeout); + } + } - // wait for the leader to be elected - leaderFuture.get(); + /** +* Tests that we can submit a job to the Dispatcher which then spawns a +* new JobManagerRunner. +*/ + @Test + public void testJobSubmission() throws Exception { + CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); - DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + // wait for the leader to be elected + leaderFuture.get(); - CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - acknowledgeFuture.get(); + CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); - verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); + acknowledgeFuture.get(); - // check that no error has occurred - fatalErrorHandler.rethrowError(); - } finally { - RpcUtils.terminateRpcEndpoint(dispatcher, timeout); - } + verify(mockJobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); } /** * Tests that the dispatcher takes part in the leader election. */ @Test public void testLeaderElection() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - UUID expectedLeaderSessionId = UUID.randomUUID(); - CompletableFuture leaderSessionIdFuture = new CompletableFuture<>(); - SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class); - TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() { - @Override - public void confirmLeaderSessionID(UUID leaderSessionId) { - super.confirmLeaderSessionID(leaderSessionId); - leaderSessionIdFuture.complete(leaderSessionId); - } - }; - - haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore); - haServices.setDispatcherLeaderElectionService(testingLeaderElectionService); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - final JobID jobId = new JobID(); - - final TestingDispatcher dispatcher = new TestingDispatcher( - rpcService, - Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), - new Configuration(), - haServices, - mock(ResourceManagerGateway.class), - mock(BlobServer.class), - heartbeatServices, - mock(MetricRegistryImpl.class), - fatalErrorHandler, - mock(JobManagerRunner.class), - jobId); - try { - dispatcher.sta
[jira] [Commented] (FLINK-8176) Dispatcher does not start SubmittedJobGraphStore
[ https://issues.apache.org/jira/browse/FLINK-8176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274469#comment-16274469 ] ASF GitHub Bot commented on FLINK-8176: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r154357254 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -534,6 +536,40 @@ public void handleError(final Exception exception) { onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception)); } + //-- + // SubmittedJobGraphListener + //-- + + @Override + public void onAddedJobGraph(final JobID jobId) { + runAsync(() -> { + final SubmittedJobGraph submittedJobGraph; + try { + submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); + } catch (final Exception e) { + log.error("Could not submit job {}.", jobId, e); --- End diff -- Maybe "could not recover job". > Dispatcher does not start SubmittedJobGraphStore > > > Key: FLINK-8176 > URL: https://issues.apache.org/jira/browse/FLINK-8176 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Job-Submission, YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > {{Dispatcher}} never calls start on its {{SubmittedJobGraphStore}} instance. > Hence, when a Job is submitted (YARN session mode with HA enabled), an > {{IllegalStateException}} is thrown: > {noformat} > java.lang.IllegalStateException: Not running. Forgot to call start()? > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.verifyIsRunning(ZooKeeperSubmittedJobGraphStore.java:411) > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.putJobGraph(ZooKeeperSubmittedJobGraphStore.java:222) > at > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:202) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:207) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:151) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > {noformat} > *Expected Behavior* > In {{start()}} method, the submittedJobGraphStore should be started as so: > {code} > submittedJobGraphStore.start(this); > {code} > To enable this, the {{Dispatcher}} must implement the interface > {{SubmittedJobGraphStore.SubmittedJobGraphListener}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r154357254 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -534,6 +536,40 @@ public void handleError(final Exception exception) { onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception)); } + //-- + // SubmittedJobGraphListener + //-- + + @Override + public void onAddedJobGraph(final JobID jobId) { + runAsync(() -> { + final SubmittedJobGraph submittedJobGraph; + try { + submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); + } catch (final Exception e) { + log.error("Could not submit job {}.", jobId, e); --- End diff -- Maybe "could not recover job". ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r154359352 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception { heartbeatServices, mock(MetricRegistryImpl.class), fatalErrorHandler, - jobManagerRunner, - jobId); + mockJobManagerRunner, + TEST_JOB_ID); - try { - dispatcher.start(); + dispatcher.start(); + } - CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); + @After + public void tearDown() throws Exception { + try { + fatalErrorHandler.rethrowError(); + } finally { + RpcUtils.terminateRpcEndpoint(dispatcher, timeout); + } + } - // wait for the leader to be elected - leaderFuture.get(); + /** +* Tests that we can submit a job to the Dispatcher which then spawns a +* new JobManagerRunner. +*/ + @Test + public void testJobSubmission() throws Exception { + CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); - DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + // wait for the leader to be elected + leaderFuture.get(); - CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - acknowledgeFuture.get(); + CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); - verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); + acknowledgeFuture.get(); - // check that no error has occurred - fatalErrorHandler.rethrowError(); - } finally { - RpcUtils.terminateRpcEndpoint(dispatcher, timeout); - } + verify(mockJobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); } /** * Tests that the dispatcher takes part in the leader election. */ @Test public void testLeaderElection() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - UUID expectedLeaderSessionId = UUID.randomUUID(); - CompletableFuture leaderSessionIdFuture = new CompletableFuture<>(); - SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class); - TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() { - @Override - public void confirmLeaderSessionID(UUID leaderSessionId) { - super.confirmLeaderSessionID(leaderSessionId); - leaderSessionIdFuture.complete(leaderSessionId); - } - }; - - haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore); - haServices.setDispatcherLeaderElectionService(testingLeaderElectionService); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - final JobID jobId = new JobID(); - - final TestingDispatcher dispatcher = new TestingDispatcher( - rpcService, - Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), - new Configuration(), - haServices, - mock(ResourceManagerGateway.class), - mock(BlobServer.class), - heartbeatServices, - mock(MetricRegistryImpl.class), - fatalErrorHandler, - mock(JobManagerRunner.class), - jobId); - try { - dispatcher.start(); + assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); - assertFalse(leaderSessionIdFuture.isDone()); + dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
[jira] [Commented] (FLINK-8173) InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
[ https://issues.apache.org/jira/browse/FLINK-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274465#comment-16274465 ] ASF GitHub Bot commented on FLINK-8173: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/5111 [FLINK-8173] [table] Fix input unboxing and support Avro Utf8 ## What is the purpose of the change Fixes a code generation issue that caused problems with Avro types. It adds tests for passing and working with Avro types. ## Brief change log Added explicit result type casts in code generator. ## Verifying this change - AvroTypesITCase added ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-8173 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5111 commit 6b9f26a6b21f19530d4870104c0cd084a3a01cda Author: twalthr Date: 2017-12-01T14:37:23Z [FLINK-8173] [table] Fix input unboxing and support Avro Utf8 > InvalidProgramException: Table program cannot be compiled. This is a bug. > Please file an issue. > --- > > Key: FLINK-8173 > URL: https://issues.apache.org/jira/browse/FLINK-8173 > Project: Flink > Issue Type: Bug >Reporter: Tao Xia >Assignee: Timo Walther > > It is a stream of Avro objects, simply select a String field and trying to > print out > val query = "SELECT nd_key FROM table1" > val result = tableEnv.sql(query) > tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print() > 11/29/2017 16:07:36 Source: Custom Source -> from: (accepted_cohort_id, > admin_id, after_submission, amount_paid, anonymous_id, application_id, > atom_key, bd_group_key, biz_geo, braavos_purchase_id, category, cohort_id, > concept_key, concept_rank, context, context_campaign, context_experiment, > coupon_code, course_key, course_rank, cta_destination, cta_location, > cta_message, cta_type, currency, decision_group_id, device_browser, > device_os, device_os_version, device_type, duration, evaluation_id, > event_type, fin_geo, in_collaboration_with, lab_id, lab_rank, label, > lesson_key, lesson_rank, locale, max_pause_duration, message, message_id, > module_key, module_rank, nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, > notification_id, num_concepts_completed, num_interactions, > num_lessons_completed, old_cohort_id, part_key, part_rank, pause_duration, > pause_reason, payment_plan, payment_provider, points_earned, points_possible, > price, price_sheet, product_key, product_type, provider_charge_id, > provider_refund_id, quiz_type, referrer, refund_amount, requested_cohort_id, > results, scholarship_group_key, search_term, skill_level, subscription_id, > suspension_length, suspension_reason, technology, timestamp, total_concepts, > total_lessons, total_time_sec, type, unenroll_reason, user_id, user_locale, > user_response, variant, version, workspace_id, workspace_session, > workspace_type) -> select: (nd_key) -> to: Utf8 -> Sink: Unnamed(5/8) > switched to FAILED > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33) > at > org.apache.flink.table.runtime.CRowOutputMapRunner.open(CRowOutputMapRunner.scala:48) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTas
[GitHub] flink pull request #5111: [FLINK-8173] [table] Fix input unboxing and suppor...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/5111 [FLINK-8173] [table] Fix input unboxing and support Avro Utf8 ## What is the purpose of the change Fixes a code generation issue that caused problems with Avro types. It adds tests for passing and working with Avro types. ## Brief change log Added explicit result type casts in code generator. ## Verifying this change - AvroTypesITCase added ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-8173 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5111 commit 6b9f26a6b21f19530d4870104c0cd084a3a01cda Author: twalthr Date: 2017-12-01T14:37:23Z [FLINK-8173] [table] Fix input unboxing and support Avro Utf8 ---
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274452#comment-16274452 ] Timo Walther commented on FLINK-8186: - [~packet] we moved Avro out of the Flink core into a dedicated {{flink-avro}} module in order to reduce our default dependencies. Did you add {{flink-avro}} to your dependencies? I just checked that {{org.apache.avro.generic.GenericRecord}} is included in the {{flink-dist}} jar that is placed in the {{/lib}} folder. I will try to reproduce your issue. > AvroInputFormat regression: fails to deserialize GenericRecords on cluster > with hadoop27 compat > --- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5110: [hotfix][docs] Fix typo in Checkpointing doc
GitHub user chuanlei opened a pull request: https://github.com/apache/flink/pull/5110 [hotfix][docs] Fix typo in Checkpointing doc You can merge this pull request into a Git repository by running: $ git pull https://github.com/chuanlei/flink fix-typo-in-checkpointing-doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5110.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5110 commit 5e3dfcee57d20566ba79e1aeeaafea12e02a9b61 Author: nichuanlei Date: 2017-12-01T14:06:21Z fix typo in checkpointing doc ---
[jira] [Commented] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274435#comment-16274435 ] Sebastian Klemke commented on FLINK-8186: - Test program runs with Flink 1.4 hadoop27 on YARN. Is no-hadoop flavour recommended for standalone? > AvroInputFormat regression: fails to deserialize GenericRecords on cluster > with hadoop27 compat > --- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat
[ https://issues.apache.org/jira/browse/FLINK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sebastian Klemke updated FLINK-8186: Priority: Minor (was: Major) > AvroInputFormat regression: fails to deserialize GenericRecords on cluster > with hadoop27 compat > --- > > Key: FLINK-8186 > URL: https://issues.apache.org/jira/browse/FLINK-8186 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Sebastian Klemke >Priority: Minor > > The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink > 1.4.0 RC2 standalone cluster, "hadoop27" flavour: > {code} > public class GenericRecordCount { > public static void main(String[] args) throws Exception { > String input = ParameterTool.fromArgs(args).getRequired("input"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > long count = env.readFile(new AvroInputFormat<>(new Path(input), > GenericRecord.class), input) > .count(); > System.out.printf("Counted %d records\n", count); > } > } > {code} > Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour > standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: > {code} > 12/01/2017 13:22:09 DataSource (at > readFile(ExecutionEnvironment.java:514) > (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED > java.lang.RuntimeException: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) > at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) > at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NoSuchMethodException: > org.apache.avro.generic.GenericRecord.() > at java.lang.Class.getConstructor0(Class.java:3082) > at java.lang.Class.getDeclaredConstructor(Class.java:2178) > at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > ... 11 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan reassigned FLINK-5506: - Assignee: Greg Hogan > Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException > - > > Key: FLINK-5506 > URL: https://issues.apache.org/jira/browse/FLINK-5506 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.1.4, 1.3.2, 1.4.1 >Reporter: Miguel E. Coimbra >Assignee: Greg Hogan > Labels: easyfix, newbie > Original Estimate: 2h > Remaining Estimate: 2h > > Reporting this here as per Vasia's advice. > I am having the following problem while trying out the > org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API > (Java). > Specs: JDK 1.8.0_102 x64 > Apache Flink: 1.1.4 > Suppose I have a very small (I tried an example with 38 vertices as well) > dataset stored in a tab-separated file 3-vertex.tsv: > {code} > #id1 id2 score > 010 > 020 > 030 > {code} > This is just a central vertex with 3 neighbors (disconnected between > themselves). > I am loading the dataset and executing the algorithm with the following code: > {code} > // Load the data from the .tsv file. > final DataSet> edgeTuples = > env.readCsvFile(inputPath) > .fieldDelimiter("\t") // node IDs are separated by spaces > .ignoreComments("#") // comments start with "%" > .types(Long.class, Long.class, Double.class); > // Generate a graph and add reverse edges (undirected). > final Graph graph = Graph.fromTupleDataSet(edgeTuples, > new MapFunction() { > private static final long serialVersionUID = 8713516577419451509L; > public Long map(Long value) { > return value; > } > }, > env).getUndirected(); > // CommunityDetection parameters. > final double hopAttenuationDelta = 0.5d; > final int iterationCount = 10; > // Prepare and trigger the execution. > DataSet> vs = graph.run(new > org.apache.flink.graph.library.CommunityDetection(iterationCount, > hopAttenuationDelta)).getVertices(); > vs.print(); > {code} > Running this code throws the following exception (check the bold line): > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NullPointerException > at > org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158) > at > org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > at > org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > at > org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) > at java.lang.Thread.run(Thread.java:745) > {code} > After a further look, I set a breakpoint (Eclipse IDE debugging) at the line > in bold: > org.apache.flink.graph.library.CommunityDetection.java (source code accessed > automatically by Maven) > // find the highest score of maxScoreLabel > double highestScore = labelsWithHighestScore.get(maxScoreLabel); > - maxSc
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274408#comment-16274408 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154344942 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java --- @@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T // - - static class InfiniteSubpartitionView implements ResultSubpartitionView { + static class InfiniteSubpartitionView extends ResultSubpartitionView { private final BufferProvider bufferProvider; private final CountDownLatch sync; public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sync) { + super(mock(ResultSubpartition.class)); + this.bufferProvider = checkNotNull(bufferProvider); this.sync = checkNotNull(sync); } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + protected Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- `@Nullable` > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274404#comment-16274404 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154344924 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -114,7 +116,7 @@ public void onNotification() { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + protected Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- `@Nullable` > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274407#comment-16274407 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154343587 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -22,32 +22,57 @@ import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A view to consume a {@link ResultSubpartition} instance. */ -public interface ResultSubpartitionView { +public abstract class ResultSubpartitionView { + + /** The parent subpartition this view belongs to. */ + private final ResultSubpartition parent; + + public ResultSubpartitionView(ResultSubpartition parent) { + this.parent = checkNotNull(parent); + } + + /** +* Returns the next {@link Buffer} instance of this queue iterator and also +* decreases the related statistics. +*/ + public Buffer getNextBuffer() throws IOException, InterruptedException { --- End diff -- `@Nullable` and (I'm not sure whether @pnowojski agrees) maybe make this method `final` (any subclass should only override `getNextBufferInternal`)? > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274406#comment-16274406 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154343656 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -22,32 +22,57 @@ import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A view to consume a {@link ResultSubpartition} instance. */ -public interface ResultSubpartitionView { +public abstract class ResultSubpartitionView { + + /** The parent subpartition this view belongs to. */ + private final ResultSubpartition parent; + + public ResultSubpartitionView(ResultSubpartition parent) { + this.parent = checkNotNull(parent); + } + + /** +* Returns the next {@link Buffer} instance of this queue iterator and also +* decreases the related statistics. +*/ + public Buffer getNextBuffer() throws IOException, InterruptedException { + Buffer buffer = getNextBufferInternal(); + if (buffer != null) { + parent.decreaseStatistics(buffer); + } + return buffer; + } + + public int getBuffersInBacklog() { + return parent.getBuffersInBacklog(); + } /** -* Returns the next {@link Buffer} instance of this queue iterator. -* -* If there is currently no instance available, it will return null. +* The internal method used by {@link ResultSubpartitionView#getNextBuffer()} +* to return the next {@link Buffer} instance of this queue iterator. +* +* If there is currently no instance available, it will return null. * This might happen for example when a pipelined queue producer is slower * than the consumer or a spilled queue needs to read in more data. -* -* Important: The consumer has to make sure that each +* +* Important: The consumer has to make sure that each * buffer instance will eventually be recycled with {@link Buffer#recycle()} * after it has been consumed. */ - Buffer getNextBuffer() throws IOException, InterruptedException; - - void notifyBuffersAvailable(long buffers) throws IOException; + protected abstract Buffer getNextBufferInternal() throws IOException, InterruptedException; --- End diff -- `@Nullable` > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274409#comment-16274409 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154344889 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -133,7 +135,7 @@ int releaseMemory() throws IOException { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + protected Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- `@Nullable` > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274405#comment-16274405 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154343510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java --- @@ -39,13 +39,15 @@ private final AtomicBoolean isReleased; PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) { + super(parent); + this.parent = checkNotNull(parent); this.availabilityListener = checkNotNull(listener); this.isReleased = new AtomicBoolean(); } @Override - public Buffer getNextBuffer() { + protected Buffer getNextBufferInternal() { --- End diff -- Actually, a lot of the methods along these calls should probably be marked `@Nullable`. Since you touched the `getNextBufferInternal()`, can you at least mark this (and maybe some calls along the call stack if I say pretty please?). You can do so in a separate `[hotfix]` commit to keep this separate > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154343510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java --- @@ -39,13 +39,15 @@ private final AtomicBoolean isReleased; PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) { + super(parent); + this.parent = checkNotNull(parent); this.availabilityListener = checkNotNull(listener); this.isReleased = new AtomicBoolean(); } @Override - public Buffer getNextBuffer() { + protected Buffer getNextBufferInternal() { --- End diff -- Actually, a lot of the methods along these calls should probably be marked `@Nullable`. Since you touched the `getNextBufferInternal()`, can you at least mark this (and maybe some calls along the call stack if I say pretty please?). You can do so in a separate `[hotfix]` commit to keep this separate ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154344889 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -133,7 +135,7 @@ int releaseMemory() throws IOException { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + protected Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- `@Nullable` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154344924 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -114,7 +116,7 @@ public void onNotification() { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + protected Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- `@Nullable` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154344942 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java --- @@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T // - - static class InfiniteSubpartitionView implements ResultSubpartitionView { + static class InfiniteSubpartitionView extends ResultSubpartitionView { private final BufferProvider bufferProvider; private final CountDownLatch sync; public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sync) { + super(mock(ResultSubpartition.class)); + this.bufferProvider = checkNotNull(bufferProvider); this.sync = checkNotNull(sync); } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + protected Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- `@Nullable` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154343587 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -22,32 +22,57 @@ import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A view to consume a {@link ResultSubpartition} instance. */ -public interface ResultSubpartitionView { +public abstract class ResultSubpartitionView { + + /** The parent subpartition this view belongs to. */ + private final ResultSubpartition parent; + + public ResultSubpartitionView(ResultSubpartition parent) { + this.parent = checkNotNull(parent); + } + + /** +* Returns the next {@link Buffer} instance of this queue iterator and also +* decreases the related statistics. +*/ + public Buffer getNextBuffer() throws IOException, InterruptedException { --- End diff -- `@Nullable` and (I'm not sure whether @pnowojski agrees) maybe make this method `final` (any subclass should only override `getNextBufferInternal`)? ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154343656 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -22,32 +22,57 @@ import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A view to consume a {@link ResultSubpartition} instance. */ -public interface ResultSubpartitionView { +public abstract class ResultSubpartitionView { + + /** The parent subpartition this view belongs to. */ + private final ResultSubpartition parent; + + public ResultSubpartitionView(ResultSubpartition parent) { + this.parent = checkNotNull(parent); + } + + /** +* Returns the next {@link Buffer} instance of this queue iterator and also +* decreases the related statistics. +*/ + public Buffer getNextBuffer() throws IOException, InterruptedException { + Buffer buffer = getNextBufferInternal(); + if (buffer != null) { + parent.decreaseStatistics(buffer); + } + return buffer; + } + + public int getBuffersInBacklog() { + return parent.getBuffersInBacklog(); + } /** -* Returns the next {@link Buffer} instance of this queue iterator. -* -* If there is currently no instance available, it will return null. +* The internal method used by {@link ResultSubpartitionView#getNextBuffer()} +* to return the next {@link Buffer} instance of this queue iterator. +* +* If there is currently no instance available, it will return null. * This might happen for example when a pipelined queue producer is slower * than the consumer or a spilled queue needs to read in more data. -* -* Important: The consumer has to make sure that each +* +* Important: The consumer has to make sure that each * buffer instance will eventually be recycled with {@link Buffer#recycle()} * after it has been consumed. */ - Buffer getNextBuffer() throws IOException, InterruptedException; - - void notifyBuffersAvailable(long buffers) throws IOException; + protected abstract Buffer getNextBufferInternal() throws IOException, InterruptedException; --- End diff -- `@Nullable` ---
[jira] [Created] (FLINK-8186) AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat
Sebastian Klemke created FLINK-8186: --- Summary: AvroInputFormat regression: fails to deserialize GenericRecords on cluster with hadoop27 compat Key: FLINK-8186 URL: https://issues.apache.org/jira/browse/FLINK-8186 Project: Flink Issue Type: Bug Affects Versions: 1.4.0 Reporter: Sebastian Klemke The following job runs fine on a Flink 1.3.2 cluster, but fails on a Flink 1.4.0 RC2 standalone cluster, "hadoop27" flavour: {code} public class GenericRecordCount { public static void main(String[] args) throws Exception { String input = ParameterTool.fromArgs(args).getRequired("input"); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); long count = env.readFile(new AvroInputFormat<>(new Path(input), GenericRecord.class), input) .count(); System.out.printf("Counted %d records\n", count); } } {code} Runs fine in LocalExecutionEnvironment and also on no-hadoop flavour standalone cluster, though. Exception thrown in Flink 1.4.0 hadoop27: {code} 12/01/2017 13:22:09 DataSource (at readFile(ExecutionEnvironment.java:514) (org.apache.flink.formats.avro.AvroInputFormat))(4/4) switched to FAILED java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.avro.generic.GenericRecord.() at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) at org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:167) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoSuchMethodException: org.apache.avro.generic.GenericRecord.() at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getDeclaredConstructor(Class.java:2178) at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) ... 11 more {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274364#comment-16274364 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154315390 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -144,20 +147,19 @@ public void testReceiveBuffer() throws Exception { inputGate.setBufferPool(bufferPool); inputGate.assignExclusiveSegments(networkBufferPool, 2); - final int backlog = 2; - final BufferResponse bufferResponse = createBufferResponse( - inputChannel.requestBuffer(), 0, inputChannel.getInputChannelId(), backlog); - final CreditBasedClientHandler handler = new CreditBasedClientHandler(); handler.addInputChannel(inputChannel); + final int backlog = 2; + final BufferResponse bufferResponse = createBufferResponse( + TestBufferFactory.createBuffer(32), 0, inputChannel.getInputChannelId(), backlog); --- End diff -- good catch not getting the buffer from the channel here > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274375#comment-16274375 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154322032 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); + + final int highWaterMark = channel.config().getWriteBufferHighWaterMark(); + // Set the writer index to the high water mark to ensure that all bytes are written + // to the wire although the buffer is "empty". + channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark)); + + // Enqueue the input channel on the condition of un-writable channel + inputChannel1.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); channel.runPendingTasks(); - // Read the enqueued msg - Object msg1 = channel.readOutbound(); + // The input channel will not notify credits via un-writable channel + assertFalse(channel.isWritable()); + verify(inputChannel1, times(1)).getAndResetCredit(); - // Should notify credit - assertEquals(msg1.getClass(), AddCredit.class); + // Flush the buffer to make the channel writable again + channel.flush(); + + // The input channel should notify credits via triggering channel's writability changed event + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(2)).getAndResetCredit(); } /** * Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, but {@link AddCredit} -* message is not sent actually after this input channel is released. +* message is not sent actually when this input channel is released. */ @Test public void testNotifyCreditAvailableAfterReleased() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.getPartitionId().getPartitionId(), inputChannel); + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); final EmbeddedChannel channel = new EmbeddedChannel(handler); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); - - // Enqueue the input channel then rele
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274368#comment-16274368 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154321304 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); + + final int highWaterMark = channel.config().getWriteBufferHighWaterMark(); + // Set the writer index to the high water mark to ensure that all bytes are written + // to the wire although the buffer is "empty". + channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark)); + + // Enqueue the input channel on the condition of un-writable channel + inputChannel1.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); channel.runPendingTasks(); - // Read the enqueued msg - Object msg1 = channel.readOutbound(); + // The input channel will not notify credits via un-writable channel + assertFalse(channel.isWritable()); + verify(inputChannel1, times(1)).getAndResetCredit(); --- End diff -- same as above > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274373#comment-16274373 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154337233 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -223,11 +229,13 @@ void notifySubpartitionConsumed() { /** * Releases all exclusive and floating buffers, closes the partition request client. */ + @VisibleForTesting @Override - void releaseAllResources() throws IOException { + public void releaseAllResources() throws IOException { --- End diff -- `@VisibleForTesting` or `public` is not needed anymore since you adapted the tests. Please make it package-private. > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274377#comment-16274377 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154321327 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); + + final int highWaterMark = channel.config().getWriteBufferHighWaterMark(); + // Set the writer index to the high water mark to ensure that all bytes are written + // to the wire although the buffer is "empty". + channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark)); + + // Enqueue the input channel on the condition of un-writable channel + inputChannel1.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); channel.runPendingTasks(); - // Read the enqueued msg - Object msg1 = channel.readOutbound(); + // The input channel will not notify credits via un-writable channel + assertFalse(channel.isWritable()); + verify(inputChannel1, times(1)).getAndResetCredit(); - // Should notify credit - assertEquals(msg1.getClass(), AddCredit.class); + // Flush the buffer to make the channel writable again + channel.flush(); + + // The input channel should notify credits via triggering channel's writability changed event + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(2)).getAndResetCredit(); --- End diff -- also here (`AddCredit` as well as verifying the unannounced credit) > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274369#comment-16274369 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154319502 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -108,17 +117,75 @@ public void testReceiveEmptyBuffer() throws Exception { final Buffer emptyBuffer = TestBufferFactory.createBuffer(); emptyBuffer.setSize(0); + final int backlog = 2; final BufferResponse receivedBuffer = createBufferResponse( - emptyBuffer, 0, inputChannel.getInputChannelId()); + emptyBuffer, 0, inputChannel.getInputChannelId(), backlog); - final PartitionRequestClientHandler client = new PartitionRequestClientHandler(); + final CreditBasedClientHandler client = new CreditBasedClientHandler(); client.addInputChannel(inputChannel); // Read the empty buffer client.channelRead(mock(ChannelHandlerContext.class), receivedBuffer); // This should not throw an exception verify(inputChannel, never()).onError(any(Throwable.class)); + verify(inputChannel, times(1)).onEmptyBuffer(0, backlog); + } + + /** +* Verifies that {@link RemoteInputChannel#onBuffer(Buffer, int, int)} is called when a +* {@link BufferResponse} is received. +*/ + @Test + public void testReceiveBuffer() throws Exception { + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32); + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + inputGate.setInputChannel(inputChannel.getPartitionId().getPartitionId(), inputChannel); + try { + final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + inputGate.setBufferPool(bufferPool); + inputGate.assignExclusiveSegments(networkBufferPool, 2); + + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); + handler.addInputChannel(inputChannel); + + final int backlog = 2; + final BufferResponse bufferResponse = createBufferResponse( + TestBufferFactory.createBuffer(32), 0, inputChannel.getInputChannelId(), backlog); + handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse); + + verify(inputChannel, times(1)).onBuffer(any(Buffer.class), anyInt(), anyInt()); + verify(inputChannel, times(1)).onSenderBacklog(backlog); --- End diff -- If you used `RemoteInputChannel#getNumberOfRequiredBuffers()` here to check the backlog's value instead, we could make the `onSenderBacklog()` method package-private again and remove the `@VisibleForTesting` > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274372#comment-16274372 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154320189 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); --- End diff -- Please use a `BufferResponse` (as above, with an appropriate backlog) to trigger the same result without a new method only used by tests - this will also test more code paths (as suggested). > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154318950 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -280,94 +280,120 @@ public String toString() { // /** -* Enqueue this input channel in the pipeline for sending unannounced credits to producer. +* Enqueue this input channel in the pipeline for notifying the producer of unannounced credit. */ void notifyCreditAvailable() { - //TODO in next PR + checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue."); + + // We should skip the notification if this channel is already released. + if (!isReleased.get()) { + partitionRequestClient.notifyCreditAvailable(this); + } } /** -* Exclusive buffer is recycled to this input channel directly and it may trigger notify -* credit to producer. +* Exclusive buffer is recycled to this input channel directly and it may trigger return extra +* floating buffer and notify increased credit to the producer. * * @param segment The exclusive segment of this channel. */ @Override public void recycle(MemorySegment segment) { - synchronized (availableBuffers) { - // Important: the isReleased check should be inside the synchronized block. - // that way the segment can also be returned to global pool after added into - // the available queue during releasing all resources. + int numAddedBuffers; + + synchronized (bufferQueue) { + // Important: check the isReleased state inside synchronized block, so there is no + // race condition when recycle and releaseAllResources running in parallel. if (isReleased.get()) { try { - inputGate.returnExclusiveSegments(Arrays.asList(segment)); + inputGate.returnExclusiveSegments(Collections.singletonList(segment)); return; } catch (Throwable t) { ExceptionUtils.rethrow(t); } } - availableBuffers.add(new Buffer(segment, this)); + numAddedBuffers = bufferQueue.addExclusiveBuffer(new Buffer(segment, this), numRequiredBuffers); } - if (unannouncedCredit.getAndAdd(1) == 0) { + if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(1) == 0) { notifyCreditAvailable(); } } public int getNumberOfAvailableBuffers() { - synchronized (availableBuffers) { - return availableBuffers.size(); + synchronized (bufferQueue) { + return bufferQueue.getAvailableBufferSize(); } } + @VisibleForTesting + public int getNumberOfRequiredBuffers() { --- End diff -- this could be package-private (and then remove `@VisibleForTesting`) ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154338813 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); + + final int highWaterMark = channel.config().getWriteBufferHighWaterMark(); + // Set the writer index to the high water mark to ensure that all bytes are written + // to the wire although the buffer is "empty". + channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark)); + + // Enqueue the input channel on the condition of un-writable channel + inputChannel1.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); channel.runPendingTasks(); - // Read the enqueued msg - Object msg1 = channel.readOutbound(); + // The input channel will not notify credits via un-writable channel + assertFalse(channel.isWritable()); + verify(inputChannel1, times(1)).getAndResetCredit(); - // Should notify credit - assertEquals(msg1.getClass(), AddCredit.class); + // Flush the buffer to make the channel writable again + channel.flush(); + + // The input channel should notify credits via triggering channel's writability changed event + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); --- End diff -- actually, here we receive the buffer that was blocking the channel first, and then the `AddCredit` message - both should be checked appropriately (see above) ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274378#comment-16274378 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154338491 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); + + final int highWaterMark = channel.config().getWriteBufferHighWaterMark(); + // Set the writer index to the high water mark to ensure that all bytes are written + // to the wire although the buffer is "empty". + channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark)); + + // Enqueue the input channel on the condition of un-writable channel + inputChannel1.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); channel.runPendingTasks(); - // Read the enqueued msg - Object msg1 = channel.readOutbound(); + // The input channel will not notify credits via un-writable channel + assertFalse(channel.isWritable()); + verify(inputChannel1, times(1)).getAndResetCredit(); - // Should notify credit - assertEquals(msg1.getClass(), AddCredit.class); + // Flush the buffer to make the channel writable again + channel.flush(); + + // The input channel should notify credits via triggering channel's writability changed event + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(2)).getAndResetCredit(); } /** * Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, but {@link AddCredit} -* message is not sent actually after this input channel is released. +* message is not sent actually when this input channel is released. */ @Test public void testNotifyCreditAvailableAfterReleased() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.getPartitionId().getPartitionId(), inputChannel); + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); final EmbeddedChannel channel = new EmbeddedChannel(handler); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); - - // Enqueue the input channel then rele
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154320189 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); --- End diff -- Please use a `BufferResponse` (as above, with an appropriate backlog) to trigger the same result without a new method only used by tests - this will also test more code paths (as suggested). ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154321304 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); + + final int highWaterMark = channel.config().getWriteBufferHighWaterMark(); + // Set the writer index to the high water mark to ensure that all bytes are written + // to the wire although the buffer is "empty". + channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark)); + + // Enqueue the input channel on the condition of un-writable channel + inputChannel1.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); channel.runPendingTasks(); - // Read the enqueued msg - Object msg1 = channel.readOutbound(); + // The input channel will not notify credits via un-writable channel + assertFalse(channel.isWritable()); + verify(inputChannel1, times(1)).getAndResetCredit(); --- End diff -- same as above ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274379#comment-16274379 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154338813 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); + + final int highWaterMark = channel.config().getWriteBufferHighWaterMark(); + // Set the writer index to the high water mark to ensure that all bytes are written + // to the wire although the buffer is "empty". + channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark)); + + // Enqueue the input channel on the condition of un-writable channel + inputChannel1.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); channel.runPendingTasks(); - // Read the enqueued msg - Object msg1 = channel.readOutbound(); + // The input channel will not notify credits via un-writable channel + assertFalse(channel.isWritable()); + verify(inputChannel1, times(1)).getAndResetCredit(); - // Should notify credit - assertEquals(msg1.getClass(), AddCredit.class); + // Flush the buffer to make the channel writable again + channel.flush(); + + // The input channel should notify credits via triggering channel's writability changed event + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); --- End diff -- actually, here we receive the buffer that was blocking the channel first, and then the `AddCredit` message - both should be checked appropriately (see above) > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > *
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274367#comment-16274367 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154318828 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -378,6 +381,11 @@ public void notifyBufferDestroyed() { // Nothing to do actually. } + @VisibleForTesting + public void increaseCredit(int credit) { --- End diff -- I think we can live without this method that should never be called outside the tests (since it modifies state): creating a `BufferResponse` with an appropriate backlog should yield the same result in the tests > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274366#comment-16274366 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154321125 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); --- End diff -- Instead of verifying the method being called, we could add a (test-visible) `getUnannouncedCredit()` method and check the actual credit (since this new method would only read the state, it is ok to have it). > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274376#comment-16274376 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154318950 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -280,94 +280,120 @@ public String toString() { // /** -* Enqueue this input channel in the pipeline for sending unannounced credits to producer. +* Enqueue this input channel in the pipeline for notifying the producer of unannounced credit. */ void notifyCreditAvailable() { - //TODO in next PR + checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue."); + + // We should skip the notification if this channel is already released. + if (!isReleased.get()) { + partitionRequestClient.notifyCreditAvailable(this); + } } /** -* Exclusive buffer is recycled to this input channel directly and it may trigger notify -* credit to producer. +* Exclusive buffer is recycled to this input channel directly and it may trigger return extra +* floating buffer and notify increased credit to the producer. * * @param segment The exclusive segment of this channel. */ @Override public void recycle(MemorySegment segment) { - synchronized (availableBuffers) { - // Important: the isReleased check should be inside the synchronized block. - // that way the segment can also be returned to global pool after added into - // the available queue during releasing all resources. + int numAddedBuffers; + + synchronized (bufferQueue) { + // Important: check the isReleased state inside synchronized block, so there is no + // race condition when recycle and releaseAllResources running in parallel. if (isReleased.get()) { try { - inputGate.returnExclusiveSegments(Arrays.asList(segment)); + inputGate.returnExclusiveSegments(Collections.singletonList(segment)); return; } catch (Throwable t) { ExceptionUtils.rethrow(t); } } - availableBuffers.add(new Buffer(segment, this)); + numAddedBuffers = bufferQueue.addExclusiveBuffer(new Buffer(segment, this), numRequiredBuffers); } - if (unannouncedCredit.getAndAdd(1) == 0) { + if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(1) == 0) { notifyCreditAvailable(); } } public int getNumberOfAvailableBuffers() { - synchronized (availableBuffers) { - return availableBuffers.size(); + synchronized (bufferQueue) { + return bufferQueue.getAvailableBufferSize(); } } + @VisibleForTesting + public int getNumberOfRequiredBuffers() { --- End diff -- this could be package-private (and then remove `@VisibleForTesting`) > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announceme
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154338695 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); + + final int highWaterMark = channel.config().getWriteBufferHighWaterMark(); + // Set the writer index to the high water mark to ensure that all bytes are written + // to the wire although the buffer is "empty". + channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark)); + + // Enqueue the input channel on the condition of un-writable channel + inputChannel1.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); channel.runPendingTasks(); - // Read the enqueued msg - Object msg1 = channel.readOutbound(); + // The input channel will not notify credits via un-writable channel + assertFalse(channel.isWritable()); + verify(inputChannel1, times(1)).getAndResetCredit(); --- End diff -- also add `assertNull(channel.readOutbound());` ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154315390 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -144,20 +147,19 @@ public void testReceiveBuffer() throws Exception { inputGate.setBufferPool(bufferPool); inputGate.assignExclusiveSegments(networkBufferPool, 2); - final int backlog = 2; - final BufferResponse bufferResponse = createBufferResponse( - inputChannel.requestBuffer(), 0, inputChannel.getInputChannelId(), backlog); - final CreditBasedClientHandler handler = new CreditBasedClientHandler(); handler.addInputChannel(inputChannel); + final int backlog = 2; + final BufferResponse bufferResponse = createBufferResponse( + TestBufferFactory.createBuffer(32), 0, inputChannel.getInputChannelId(), backlog); --- End diff -- good catch not getting the buffer from the channel here ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274371#comment-16274371 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154338695 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); + + final int highWaterMark = channel.config().getWriteBufferHighWaterMark(); + // Set the writer index to the high water mark to ensure that all bytes are written + // to the wire although the buffer is "empty". + channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark)); + + // Enqueue the input channel on the condition of un-writable channel + inputChannel1.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); channel.runPendingTasks(); - // Read the enqueued msg - Object msg1 = channel.readOutbound(); + // The input channel will not notify credits via un-writable channel + assertFalse(channel.isWritable()); + verify(inputChannel1, times(1)).getAndResetCredit(); --- End diff -- also add `assertNull(channel.readOutbound());` > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274374#comment-16274374 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154336638 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java --- @@ -167,6 +167,13 @@ public void operationComplete(ChannelFuture future) throws Exception { }); } + public void notifyCreditAvailable(RemoteInputChannel inputChannel) { --- End diff -- this may be package-private > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154321327 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); + + final int highWaterMark = channel.config().getWriteBufferHighWaterMark(); + // Set the writer index to the high water mark to ensure that all bytes are written + // to the wire although the buffer is "empty". + channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark)); + + // Enqueue the input channel on the condition of un-writable channel + inputChannel1.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); channel.runPendingTasks(); - // Read the enqueued msg - Object msg1 = channel.readOutbound(); + // The input channel will not notify credits via un-writable channel + assertFalse(channel.isWritable()); + verify(inputChannel1, times(1)).getAndResetCredit(); - // Should notify credit - assertEquals(msg1.getClass(), AddCredit.class); + // Flush the buffer to make the channel writable again + channel.flush(); + + // The input channel should notify credits via triggering channel's writability changed event + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(2)).getAndResetCredit(); --- End diff -- also here (`AddCredit` as well as verifying the unannounced credit) ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274365#comment-16274365 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154316689 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -170,29 +172,20 @@ public void testReceiveBuffer() throws Exception { */ @Test public void testThrowExceptionForNoAvailableBuffer() throws Exception { - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32); final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); - inputGate.setInputChannel(inputChannel.getPartitionId().getPartitionId(), inputChannel); - try { - inputGate.assignExclusiveSegments(networkBufferPool, 1); - - final BufferResponse bufferResponse = createBufferResponse( - inputChannel.requestBuffer(), 0, inputChannel.getInputChannelId(), 2); - final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - handler.addInputChannel(inputChannel); + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); + handler.addInputChannel(inputChannel); - handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse); + assertEquals("There should be no buffers available in the channel.", + 0, inputChannel.getNumberOfAvailableBuffers()); - verify(inputChannel, times(1)).onError(any(IllegalStateException.class)); - } finally { - // Release all the buffer resources - inputChannel.releaseAllResources(); + final BufferResponse bufferResponse = createBufferResponse( + TestBufferFactory.createBuffer(), 0, inputChannel.getInputChannelId(), 2); + handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse); - networkBufferPool.destroyAllBufferPools(); - networkBufferPool.destroy(); - } + verify(inputChannel, times(1)).onError(any(IllegalStateException.class)); --- End diff -- nice test simplification > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r154321125 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception { } /** -* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and -* {@link AddCredit} message is sent to the producer. +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying credits, +* and verifies the behaviour of credit notification by triggering channel's writability changed. */ @Test public void testNotifyCreditAvailable() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate)); + final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate)); final CreditBasedClientHandler handler = new CreditBasedClientHandler(); - final EmbeddedChannel channel = new EmbeddedChannel(handler); + final EmbeddedChannel channel = spy(new EmbeddedChannel(handler)); - final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class)); + // Increase the credits to enqueue the input channels + inputChannel1.increaseCredit(1); + inputChannel2.increaseCredit(1); + handler.notifyCreditAvailable(inputChannel1); + handler.notifyCreditAvailable(inputChannel2); - // Enqueue the input channel - handler.notifyCreditAvailable(inputChannel); + channel.runPendingTasks(); + + // The two input channels should notify credits via writable channel + assertTrue(channel.isWritable()); + assertEquals(channel.readOutbound().getClass(), AddCredit.class); + verify(inputChannel1, times(1)).getAndResetCredit(); + verify(inputChannel2, times(1)).getAndResetCredit(); --- End diff -- Instead of verifying the method being called, we could add a (test-visible) `getUnannouncedCredit()` method and check the actual credit (since this new method would only read the state, it is ok to have it). ---