[jira] [Created] (FLINK-2895) Duplicate immutable object creation
Greg Hogan created FLINK-2895: - Summary: Duplicate immutable object creation Key: FLINK-2895 URL: https://issues.apache.org/jira/browse/FLINK-2895 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 0.10 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Inverse of FLINK-2724. When object reuse is disabled a few operators are creating and passing objects locally. In the case of immutable objects these will be discarded by the {{TypeSerializer}} when deserializing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2897) Use distinct initial indices for OutputEmitter round-robin
Greg Hogan created FLINK-2897: - Summary: Use distinct initial indices for OutputEmitter round-robin Key: FLINK-2897 URL: https://issues.apache.org/jira/browse/FLINK-2897 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 0.10 Reporter: Greg Hogan Assignee: Greg Hogan Currently, when performing a round-robin partitioning each task will sequentially partition starting with partition "1". This is fine in the usual case where the number of partitioned objects greatly exceeds the number of channels. However, in the case where the number of objects is relatively few (each, perhaps, requiring a large computation or access to an external system) it would be much better to begin partitioning at distinct indices (the task index). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2898) Invert Travis CI build order
Greg Hogan created FLINK-2898: - Summary: Invert Travis CI build order Key: FLINK-2898 URL: https://issues.apache.org/jira/browse/FLINK-2898 Project: Flink Issue Type: Improvement Components: Build System Affects Versions: 0.10 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial The Travis CI builds generally perform fastest to slowest. When running additional, concurrent Travis CI builds it would be preferable to have the slowest tasks begin first. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2902) Web interface sort tasks newest first
Greg Hogan created FLINK-2902: - Summary: Web interface sort tasks newest first Key: FLINK-2902 URL: https://issues.apache.org/jira/browse/FLINK-2902 Project: Flink Issue Type: Improvement Components: Webfrontend Affects Versions: 0.10 Reporter: Greg Hogan Priority: Minor Sort completed jobs in reverse order so the most recently finished (?) are at the top of the list. With a long list of completed jobs the user must scroll down to view recently completed jobs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2903) Web interface numeric localization
Greg Hogan created FLINK-2903: - Summary: Web interface numeric localization Key: FLINK-2903 URL: https://issues.apache.org/jira/browse/FLINK-2903 Project: Flink Issue Type: Improvement Components: Webfrontend Affects Versions: 0.10 Reporter: Greg Hogan Priority: Minor It would be nice to localize numbers in the web interface as 10+ digits is difficult to parse without separators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2904) Web interface truncated task counts
Greg Hogan created FLINK-2904: - Summary: Web interface truncated task counts Key: FLINK-2904 URL: https://issues.apache.org/jira/browse/FLINK-2904 Project: Flink Issue Type: Improvement Components: Webfrontend Affects Versions: 0.10 Reporter: Greg Hogan Priority: Minor Task counts have only three digits visible as the color square needs to dynamically expand. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2907) Bloom filter for Join
Greg Hogan created FLINK-2907: - Summary: Bloom filter for Join Key: FLINK-2907 URL: https://issues.apache.org/jira/browse/FLINK-2907 Project: Flink Issue Type: New Feature Components: Java API, Scala API Affects Versions: 1.0 Reporter: Greg Hogan Assignee: Greg Hogan A bloom filter can be a chainable operation for probe side Join elements. An element not matched by the bloom filter will not be serialized, shipped, deserialized, and processed. Generating the bloom filter is a chainable operation over hash side elements. The bloom filter created on each TaskManager must be the same size to allow combining by xor. The most efficient means to distribute the bloom filter is to assign each TaskManager an equal partition that it will receive from all other TaskManagers. This will be broadcast once all local elements (by hashing) and remote partitions (by xor) have been processed into that part of the bloom filter. An example with numbers: triangle listing/counting joining 2B edges on 149B two-paths resulting in 21B triangles (this is using the optimal algorithm). At 8 bits per element the bloom filter will have a false-positive rate of ~2% and require a 2 GB bloom filter (stored once and shared per TaskManager). Each TaskManager both sends and receives data equivalent to the size of the bloom filter (minus the local partition, the size of which trends towards zero as the number of TaskManagers increases). The number of matched elements is 21B (true positive) + ~0.02*(149B-21B) = 23.5B, a reduction of 84% or 1.5 TB (at 12 bytes per element). With 4 TaskManagers only 12 GB of bloom filter would be transmitted, a savings of 99.2%. Key issues are determining the size of the bloom filter (dependent on the count of hash side elements, the available memory segments, and the error rate) and whether this can be integrated with Join or must be a separate operator. This also depends on dynamic memory allocation as spilling to disk would perform the serialization, write, read, and deserialization we are looking to avoid. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2908) Web interface redraw web plan when browser resized
Greg Hogan created FLINK-2908: - Summary: Web interface redraw web plan when browser resized Key: FLINK-2908 URL: https://issues.apache.org/jira/browse/FLINK-2908 Project: Flink Issue Type: Improvement Components: Webfrontend Affects Versions: 0.10 Reporter: Greg Hogan Priority: Trivial The job plan graph does not resize when the user expands the browser window (only a change in width matters). To reproduce: 1) open the plan tab of a running or completed job in a non-maximized browser window (not full width), 2) maximize the browser window. Workaround: refresh the web page. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2909) Gelly Graph Generators
Greg Hogan created FLINK-2909: - Summary: Gelly Graph Generators Key: FLINK-2909 URL: https://issues.apache.org/jira/browse/FLINK-2909 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.0 Reporter: Greg Hogan Assignee: Greg Hogan Include a selection of graph generators in Gelly. Generated graphs will be useful for performing scalability, stress, and regression testing as well as benchmarking and comparing algorithms, for both Flink users and developers. Generated data is infinitely scalable yet described by a few simple parameters and can often substitute for user data or sharing large files when reporting issues. There are at multiple categories of graphs as documented by [NetworkX|https://networkx.github.io/documentation/latest/reference/generators.html] and elsewhere. Graphs may be a well-defined, i.e. the [Chvátal graph|https://en.wikipedia.org/wiki/Chv%C3%A1tal_graph]. These may be sufficiently small to populate locally. Graphs may be scalable, i.e. complete and star graphs. These should use Flink's distributed parallelism. Graphs may be stochastic, i.e. [RMat graphs|http://snap.stanford.edu/class/cs224w-readings/chakrabarti04rmat.pdf] . A key consideration is that the graphs should source randomness from a seedable PRNG and generate the same Graph regardless of parallelism. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3160) Aggregate operator statistics by TaskManager
Greg Hogan created FLINK-3160: - Summary: Aggregate operator statistics by TaskManager Key: FLINK-3160 URL: https://issues.apache.org/jira/browse/FLINK-3160 Project: Flink Issue Type: Improvement Components: Webfrontend Affects Versions: 1.0.0 Reporter: Greg Hogan The web client job info page presents a table of the following per task statistics: start time, end time, duration, bytes received, records received, bytes sent, records sent, attempt, host, status. Flink supports clusters with thousands of slots and a job setting a high parallelism renders this job info page unwieldy and difficult to analyze in real-time. It would be helpful to optionally or automatically aggregate statistics by TaskManager. These rows could then be expanded to reveal the current per task statistics. Start time, end time, duration, and attempt are not applicable to a TaskManager since new tasks for repeated attempts may be started. Bytes received, records received, bytes sent, and records sent are summed. Any throughput metrics can be averaged over the total task time or time window. Status could reference the number of running tasks on the TaskManager or an idle state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3161) Externalize cluster start-up and tear-down when available
Greg Hogan created FLINK-3161: - Summary: Externalize cluster start-up and tear-down when available Key: FLINK-3161 URL: https://issues.apache.org/jira/browse/FLINK-3161 Project: Flink Issue Type: Improvement Components: Start-Stop Scripts Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor I have been using pdsh, pdcp, and rpdcp to both distribute compiled Flink and to start and stop the TaskManagers. The current shell script initializes TaskManagers one-at-a-time. This is trivial to background but would be unthrottled. >From pdsh's archived homepage: "uses a sliding window of threads to execute >remote commands, conserving socket resources while allowing some connections >to timeout if needed". What other tools could be supported when available? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3162) Configure number of TaskManager slots as ratio of available processors
Greg Hogan created FLINK-3162: - Summary: Configure number of TaskManager slots as ratio of available processors Key: FLINK-3162 URL: https://issues.apache.org/jira/browse/FLINK-3162 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Greg Hogan Priority: Minor The number of TaskManager slots is currently only configurable by explicitly setting {{taskmanager.numberOfTaskSlots}}. Make this configurable by a ratio of the number of available processors (for example, "2", for hyperthreading). This can work in the same way as {{taskmanager.memory.size}} and {{taskmanager.memory.fraction}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3163) Configure Flink for NUMA systems
Greg Hogan created FLINK-3163: - Summary: Configure Flink for NUMA systems Key: FLINK-3163 URL: https://issues.apache.org/jira/browse/FLINK-3163 Project: Flink Issue Type: Improvement Components: Start-Stop Scripts Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan On NUMA systems Flink can be pinned to a single physical processor ("node") using {{numactl --membind=$node --cpunodebind=$node }}. Commonly available NUMA systems include the largest AWS and Google Compute instances. For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could configure a single TaskManager with 36 slots or have Flink create two TaskManagers bound to each of the NUMA nodes, each with 18 slots. There may be some extra overhead in transferring network buffers between TaskManagers on the same system, though the fraction of data shuffled in this manner decreases with the size of the cluster. The performance improvement from only accessing local memory looks to be significant though difficult to benchmark. The JobManagers may fit into NUMA nodes rather than requiring full systems. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3164) Spread out scheduling strategy
Greg Hogan created FLINK-3164: - Summary: Spread out scheduling strategy Key: FLINK-3164 URL: https://issues.apache.org/jira/browse/FLINK-3164 Project: Flink Issue Type: Improvement Components: Distributed Runtime, Java API, Scala API Affects Versions: 1.0.0 Reporter: Greg Hogan The size of a Flink cluster is bounded by the amount of memory allocated for network buffers. The all-to-all distribution of data during a network shuffle means that doubling the number of TaskManager slots quadruples the required number of network buffers. A Flink job can be configured to execute operators with lower parallelism which reduces the number of network buffers used across the cluster. Since the Flink scheduler clusters tasks the number of network buffers to be configured cannot be reduced. For example, if each TaskManager has 32 slots and the cluster has 32 TaskManagers the maximum parallelism can be set to 1024. If the preceding operator has a parallelism of 32 then the TaskManager fan-out is between 1*1024 (tasks evenly distributed) and 32*1024 (executed on a single TaskManager). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3206) Heap size for non-pre-allocated off-heap memory
Greg Hogan created FLINK-3206: - Summary: Heap size for non-pre-allocated off-heap memory Key: FLINK-3206 URL: https://issues.apache.org/jira/browse/FLINK-3206 Project: Flink Issue Type: Bug Components: Start-Stop Scripts Affects Versions: 1.0.0 Reporter: Greg Hogan In {{taskmanager.sh}} the heap size is adjusted for off-heap memory only with pre-allocation set to true. In {{TaskManager.scala}} the computation is reversed to compute the {{directMemorySize}}. The effect is the JVM heap settings are too high and the assumed size of direct memory is also too high. {noformat} taskmanager.memory.fraction: 0.9 taskmanager.memory.off-heap: true taskmanager.heap.mb: 18000 {noformat} With {{taskmanager.memory.preallocate: false}} {noformat} 13:44:29,015 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xms18000M 13:44:29,015 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xmx18000M 13:44:30,591 INFO org.apache.flink.runtime.taskmanager.TaskManager - Limiting managed memory to 0.9 of the maximum memory size (161999 MB), memory will be allocated lazily. {noformat} With {{taskmanager.memory.preallocate: true}} {noformat} 13:53:44,127 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xms1800M 13:53:44,127 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xmx1800M 13:53:45,743 INFO org.apache.flink.runtime.taskmanager.TaskManager - Using 0.9 of the maximum memory size for managed off-heap memory (15524 MB). {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3218) Merging Hadoop configurations overrides user parameters
Greg Hogan created FLINK-3218: - Summary: Merging Hadoop configurations overrides user parameters Key: FLINK-3218 URL: https://issues.apache.org/jira/browse/FLINK-3218 Project: Flink Issue Type: Bug Components: Java API Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan The {{HadoopUtils.mergeHadoopConf}} load the default Hadoop configuration over the user configuration. We should only copy parameters that do not exist in the user configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3219) Implement DataSet.count using a single operator
Greg Hogan created FLINK-3219: - Summary: Implement DataSet.count using a single operator Key: FLINK-3219 URL: https://issues.apache.org/jira/browse/FLINK-3219 Project: Flink Issue Type: Improvement Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor {{DataSet.count}} is currently implemented using a {{FlatMapFunction}} followed by a {{DiscardingOutputFormat}}. As noted by [~StephanEwen] in [FLINK-2716] this can be done with only a {{RichOutputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3262) Remove fuzzy versioning from Bower dependencies
Greg Hogan created FLINK-3262: - Summary: Remove fuzzy versioning from Bower dependencies Key: FLINK-3262 URL: https://issues.apache.org/jira/browse/FLINK-3262 Project: Flink Issue Type: Improvement Components: Webfrontend Affects Versions: 1.00 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial {{bower.json}} is currently defined with fuzzy versions, i.e. {{"bootstrap": "~3.3.5"}}, which silently pull in patch updates. When a user compiles the web frontend the new versions are creating changes in the compiled Javascript and CSS. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3263) Log task statistics on TaskManager
Greg Hogan created FLINK-3263: - Summary: Log task statistics on TaskManager Key: FLINK-3263 URL: https://issues.apache.org/jira/browse/FLINK-3263 Project: Flink Issue Type: Improvement Components: TaskManager Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Similar to how memory statistics can be written to the TaskMangers' log files by configuring {{taskmanager.debug.memory.startLogThread}} and {{taskmanager.debug.memory.logIntervalMs}}, it would be useful to have statistics written for each task within a job. One use case is to reconstruct progress to analyze why TaskManagers take different amounts of time to process the same quantity of data. I envision this being the same statistics which are displayed on the web frontend. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3277) Use Value types in Gelly API
Greg Hogan created FLINK-3277: - Summary: Use Value types in Gelly API Key: FLINK-3277 URL: https://issues.apache.org/jira/browse/FLINK-3277 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.0.0 Reporter: Greg Hogan This would be a breaking change so the discussion needs to happen before the 1.0.0 release. I think it would benefit Flink to use {{Value}} types wherever possible. The {{Graph}} functions {{inDegrees}}, {{outDegrees}}, and {{getDegrees}} each return {{DataSet>}}. Using {{Long}} creates a new heap object for every serialization and deserialization. The mutable {{Value}} types do not suffer from this issue when object reuse is enabled. I lean towards a preference for conciseness in documentation and performance in examples and APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3279) Optionally disable DistinctOperator combiner
Greg Hogan created FLINK-3279: - Summary: Optionally disable DistinctOperator combiner Key: FLINK-3279 URL: https://issues.apache.org/jira/browse/FLINK-3279 Project: Flink Issue Type: New Feature Components: DataSet API Affects Versions: 1.0.0 Reporter: Greg Hogan Priority: Minor Calling {{DataSet.distinct()}} executes {{DistinctOperator.DistinctFunction}} which is a combinable {{RichGroupReduceFunction}}. Sometimes we know that there will be few duplicate records and disabling the combine would improve performance. I propose adding {{public DistinctOperator setCombinable(boolean combinable)}} to {{DistinctOperator}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3335) DataSourceTask object reuse when disabled
Greg Hogan created FLINK-3335: - Summary: DataSourceTask object reuse when disabled Key: FLINK-3335 URL: https://issues.apache.org/jira/browse/FLINK-3335 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan >From {{DataSourceTask.invoke()}}: {code} if ((returned = format.nextRecord(serializer.createInstance())) != null) { output.collect(returned); } {code} The returned value ({{returned}}) must be copied rather than creating and passing in a new instance. The {{InputFormat}} interface only permits the given object to be used and does not require a new object to be returned otherwise. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3337) mvn test fails on flink-runtime because curator classes not found
Greg Hogan created FLINK-3337: - Summary: mvn test fails on flink-runtime because curator classes not found Key: FLINK-3337 URL: https://issues.apache.org/jira/browse/FLINK-3337 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Greg Hogan This has been reported before. I am running {{mvn test}} on an AWS c4.2xlarge, Flink HEAD (version 69f7f6d9...) and seeing the missing curator classes. For example, {code} testMultipleLeaders(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest) Time elapsed: 1.042 sec <<< ERROR! java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testMultipleLeaders(ZooKeeperLeaderElectionTest.java:291) {code} {code} Tests in error: JobManagerLeaderElectionTest.testLeaderElection:99->createJobManagerProps:166 NoClassDefFound JobManagerLeaderElectionTest.testLeaderReelection:130->createJobManagerProps:166 NoClassDefFound ZooKeeperLeaderElectionTest.testEphemeralZooKeeperNodes:444 NoClassDefFound or... ZooKeeperLeaderElectionTest.testExceptionForwarding:372 NoClassDefFound org/ap... ZooKeeperLeaderElectionTest.testMultipleLeaders:291 NoClassDefFound org/apache... ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval:94 NoClassDefFound ZooKeeperLeaderElectionTest.testZooKeeperReelection:137 » NoClassDefFound org/... ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement:207 NoClassDefFound ZooKeeperLeaderRetrievalTest.testConnectingAddressRetrievalWithDelayedLeaderElection:96 NoClassDefFound ZooKeeperLeaderRetrievalTest.testTimeoutOfFindConnectingAddress:187 » NoClassDefFound ZooKeeperUtilTest.testZooKeeperEnsembleConnectStringConfiguration:40 NoClassDefFound {code} The issue is resolved when removing the curator excludes from {{flink-runtime/pom.xml}}: {code} org.apache.curator:curator-recipes org.apache.curator:curator-client org.apache.curator:curator-framework {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3340) Fix object juggling in reduce drivers
Greg Hogan created FLINK-3340: - Summary: Fix object juggling in reduce drivers Key: FLINK-3340 URL: https://issues.apache.org/jira/browse/FLINK-3340 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Critical {{ReduceDriver}}, {{ReduceCombineDriver}}, and {{ChainedAllReduceDriver}} are not properly tracking objects for reuse. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
Greg Hogan created FLINK-3382: - Summary: Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper Key: FLINK-3382 URL: https://issues.apache.org/jira/browse/FLINK-3382 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()} can be clarified by creating a single object and storing the iterator's next value into the second reference. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3385) Fix outer join skipping unprobed partitions
Greg Hogan created FLINK-3385: - Summary: Fix outer join skipping unprobed partitions Key: FLINK-3385 URL: https://issues.apache.org/jira/browse/FLINK-3385 Project: Flink Issue Type: Bug Components: Distributed Runtime Reporter: Greg Hogan Priority: Critical Fix For: 1.0.0 {{MutableHashTable.nextRecord}} performs three steps for a build-side outer join: {code} public boolean nextRecord() throws IOException { if (buildSideOuterJoin) { return processProbeIter() || processUnmatchedBuildIter() || prepareNextPartition(); } else { return processProbeIter() || prepareNextPartition(); } } {code} {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to {{MutableHashTable.moveToNextBucket}} which is unable to process spilled partitions: {code} if (p.isInMemory()) { ... } else { return false; } {code} {{MutableHashTable.prepareNextPartition}} calls {{HashPartition.finalizeProbePhase}} which only spills the partition (to be read and processed in the next instantiation of {{MutableHashTable}}) if probe-side records were spilled. In an equi-join this is fine but with an outer join the unmatched build-side records must still be retained (though no further probing is necessary, so could this be short-circuited when loaded by the next {{MutableHashTable}}?). {code} if (isInMemory()) { ... } else if (this.probeSideRecordCounter == 0) { // partition is empty, no spilled buffers // return the memory buffer freeMemory.add(this.probeSideBuffer.getCurrentSegment()); // delete the spill files this.probeSideChannel.close(); this.buildSideChannel.deleteChannel(); this.probeSideChannel.deleteChannel(); return 0; } else { // flush the last probe side buffer and register this partition as pending this.probeSideBuffer.close(); this.probeSideChannel.close(); spilledPartitions.add(this); return 1; } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3393) ExternalProcessRunner wait to finish copying error stream
Greg Hogan created FLINK-3393: - Summary: ExternalProcessRunner wait to finish copying error stream Key: FLINK-3393 URL: https://issues.apache.org/jira/browse/FLINK-3393 Project: Flink Issue Type: Bug Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Travis CI reported the following error, which could be caused by {{ExternalProcessRunner.run}} returning before the {{PipeForwarder}} has finished copying standard error. Resolution is for {{ExternalProcessRunner.run}} to join on the {{PipeForwarder}}. {code} Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.414 sec <<< FAILURE! - in org.apache.flink.util.ExternalProcessRunnerTest testFailing(org.apache.flink.util.ExternalProcessRunnerTest) Time elapsed: 0.405 sec <<< FAILURE! java.lang.AssertionError: null at org.junit.Assert.fail(Assert.java:86) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertTrue(Assert.java:52) at org.apache.flink.util.ExternalProcessRunnerTest.testFailing(ExternalProcessRunnerTest.java:75) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3395) Polishing the web UI
Greg Hogan created FLINK-3395: - Summary: Polishing the web UI Key: FLINK-3395 URL: https://issues.apache.org/jira/browse/FLINK-3395 Project: Flink Issue Type: Improvement Components: Webfrontend Reporter: Greg Hogan Fix For: 1.0.0 On the job properties page one must select an operator from the plan. Elsewhere in the UI a list of operators is displayed and clicking the table or the plan will reveal the requested information. A list of operators could likewise be added to the timeline page. Also, when selecting an operator the subtask timelines zoom out to show "000" for milliseconds. It would be nice to back off to seconds, minutes, etc. The job exceptions page should display a "No exceptions" notification as done elsewhere for when there is nothing to display. In the job plan page, selecting accumulators, checkpoints, or back pressure deselects "Plan" from the upper tab, removing the underline and emphasis. The job plan is not redrawn when the browser window is resized. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3437) Fix UI router state for job plan
Greg Hogan created FLINK-3437: - Summary: Fix UI router state for job plan Key: FLINK-3437 URL: https://issues.apache.org/jira/browse/FLINK-3437 Project: Flink Issue Type: Sub-task Components: Webfrontend Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor In the job plan page, selecting accumulators, checkpoints, or back pressure deselects "Plan" from the upper tab, removing the underline and emphasis. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3447) Package Gelly algorithms by framework
Greg Hogan created FLINK-3447: - Summary: Package Gelly algorithms by framework Key: FLINK-3447 URL: https://issues.apache.org/jira/browse/FLINK-3447 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Currently algorithms in the Gelly library are collected in the {{org.apache.flink.graph.library}} package. The gather-sum-apply class names are prefixed by "GSA". Gelly contains multiple frameworks as named in FLINK-3208. Since algorithms can be (and are) duplicated across the multiple frameworks, we can move the algorithms into subpackages by the name of the framework. - vertex-centric model: {{org.apache.flink.graph.library.pregel}} - scatter-gather model: {{org.apache.flink.graph.library.spargel}} - gather-sum-apply model: {{org.apache.flink.graph.library.gsa}} - native methods: {{org.apache.flink.graph.library.asm}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3453) Fix TaskManager logs exception when sampling backpressure while task completes
Greg Hogan created FLINK-3453: - Summary: Fix TaskManager logs exception when sampling backpressure while task completes Key: FLINK-3453 URL: https://issues.apache.org/jira/browse/FLINK-3453 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Greg Hogan Priority: Minor Backpressure sampling is interrupted when a task completes. It may be best to create a new response class for this case. {noformat} java.lang.IllegalStateException: Cannot sample task 08f138723e8174e70f5e7ddc672f8954. Task was removed after 65 sample(s). at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleStackTraceSampleMessage(TaskManager.scala:743) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:277) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:119) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2016-02-19 09:36:52,773 ERROR org.apache.flink.runtime.webmonitor.BackPressureStatsTracker - Failed to gather stack trace sample. java.lang.RuntimeException: Discarded at org.apache.flink.runtime.webmonitor.StackTraceSampleCoordinator$PendingStackTraceSample.discard(StackTraceSampleCoordinator.java:394) at org.apache.flink.runtime.webmonitor.StackTraceSampleCoordinator.cancelStackTraceSample(StackTraceSampleCoordinator.java:249) at org.apache.flink.runtime.webmonitor.StackTraceSampleCoordinator$StackTraceSampleCoordinatorActor.handleMessage(StackTraceSampleCoordinator.java:462) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:97) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalStateException: Cannot sample task 08f138723e8174e70f5e7ddc672f8954. Task was removed after 65 sample(s). at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleStackTraceSampleMessage(TaskManager.scala:743) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.apply
[jira] [Created] (FLINK-3454) Add test dependencies on packaged jars
Greg Hogan created FLINK-3454: - Summary: Add test dependencies on packaged jars Key: FLINK-3454 URL: https://issues.apache.org/jira/browse/FLINK-3454 Project: Flink Issue Type: Bug Components: Scala Shell, YARN Client Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor The following test from flink-scala-shell requires that the flink-ml jar be compiled. {noformat} Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 213.66 sec <<< FAILURE! - in org.apache.flink.api.scala.ScalaShellITCase testSubmissionOfExternalLibrary(org.apache.flink.api.scala.ScalaShellITCase) Time elapsed: 0.028 sec <<< FAILURE! java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:151) at org.apache.flink.api.scala.ScalaShellITCase.testSubmissionOfExternalLibrary(ScalaShellITCase.scala:169) {noformat} flink-yarn-tests depends on flink-dist: {noformat} Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.971 sec <<< FAILURE! - in org.apache.flink.yarn.UtilsTest testUberjarLocator(org.apache.flink.yarn.UtilsTest) Time elapsed: 0.803 sec <<< FAILURE! java.lang.AssertionError: null at org.junit.Assert.fail(Assert.java:86) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertNotNull(Assert.java:621) at org.junit.Assert.assertNotNull(Assert.java:631) at org.apache.flink.yarn.UtilsTest.testUberjarLocator(UtilsTest.java:42) {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3467) Remove superfluous objects from DataSourceTask.invoke
Greg Hogan created FLINK-3467: - Summary: Remove superfluous objects from DataSourceTask.invoke Key: FLINK-3467 URL: https://issues.apache.org/jira/browse/FLINK-3467 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor In the object reuse case we cycle between three objects. This is a vestige of an earlier bug fix, but does not appear to solve any issue. In the non-reusing case we should not be creating a new object to immediately be reused. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3469) Fix documentation for grouping keys
Greg Hogan created FLINK-3469: - Summary: Fix documentation for grouping keys Key: FLINK-3469 URL: https://issues.apache.org/jira/browse/FLINK-3469 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.0.0 The transformation documentation for "Reduce on DataSet Grouped by KeySelector Function" uses a field expression in the Java example. There are four ways to specify keys and only two have named examples in the documentation. Expand the documentation to cover all cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3589) Allow setting Operator parallelism to default value
Greg Hogan created FLINK-3589: - Summary: Allow setting Operator parallelism to default value Key: FLINK-3589 URL: https://issues.apache.org/jira/browse/FLINK-3589 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor User's can override the parallelism for a single operator by calling {{Operator.setParallelism}}, which accepts a positive value. {{Operator}} uses {{-1}} to indicate default parallelism. It would be nice to name and accept this default value. This would enable user algorithms to allow configurable parallelism while still chaining operator methods. For example, currently: {code} private int parallelism; ... public void setParallelism(int parallelism) { this.parallelism = parallelism; } ... MapOperator, Edge> newEdges = edges .map(new MyMapFunction()) .name("My map function"); if (parallelism > 0) { newEdges.setParallelism(parallelism); } {code} Could be simplified to: {code} private int parallelism = Operator.DEFAULT_PARALLELISM; ... public void setParallelism(int parallelism) { this.parallelism = parallelism; } ... DataSet> newEdges = edges .map(new MyMapFunction()) .setParallelism(parallelism) .name("My map function"); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3634) Fix documentation for DataSetUtils.zipWithUniqueId()
Greg Hogan created FLINK-3634: - Summary: Fix documentation for DataSetUtils.zipWithUniqueId() Key: FLINK-3634 URL: https://issues.apache.org/jira/browse/FLINK-3634 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0, 1.0.1 Under FLINK-2590 the assignment and testing of unique IDs was improved but the documentation looks to still reference the old implementation. With {{parallelism=1}} there is no difference between zipWithUniqueID and zipWithIndex. With greater parallelism the results of zipWithUniqueID are dependent on the partitioning. The documentation should demonstrate a possible result that is different from the incremental sequence of zipWithIndex while noting that results are dependent on the parallelism and partitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3623) Adjust MurmurHash algorithm
Greg Hogan created FLINK-3623: - Summary: Adjust MurmurHash algorithm Key: FLINK-3623 URL: https://issues.apache.org/jira/browse/FLINK-3623 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Flink's MurmurHash implementation differs from the published algorithm. >From Flink's MathUtils.java: {code} code *= 0xe6546b64; {code} The Murmur3_32 algorithm as described by [Wikipedia|https://en.wikipedia.org/wiki/MurmurHash]: {code} m ← 5 n ← 0xe6546b64 hash ← hash × m + n {code} and in Guava's Murmur3_32HashFunction.java: {code} h1 = h1 * 5 + 0xe6546b64; {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3625) Graph algorithms to permute graph labels and edges
Greg Hogan created FLINK-3625: - Summary: Graph algorithms to permute graph labels and edges Key: FLINK-3625 URL: https://issues.apache.org/jira/browse/FLINK-3625 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor As a complement to graph generators it may be required to permute graph labels (either a set or a sequence) or edges. Shuffling edges does not alter the Graph but relabeling should be performed in a scale-free manner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3695) ValueArray types
Greg Hogan created FLINK-3695: - Summary: ValueArray types Key: FLINK-3695 URL: https://issues.apache.org/jira/browse/FLINK-3695 Project: Flink Issue Type: New Feature Components: Core Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Flink provides mutable {{Value}} type implementations of Java primitives along with efficient serializers and comparators. It would be useful to have corresponding {{ValueArray}} implementations backed by primitive rather than object arrays, along with an {{ArrayableValue}} interface tying a {{Value}} to its {{ValueArray}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3721) Min and max accumulators
Greg Hogan created FLINK-3721: - Summary: Min and max accumulators Key: FLINK-3721 URL: https://issues.apache.org/jira/browse/FLINK-3721 Project: Flink Issue Type: New Feature Components: Core Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Flink already contains {{DoubleCounter}}, {{IntCounter}}, and {{LongCounter}} for adding numbers. This will add equivalent accumulators for storing the minimum and maximum double, int, and long values. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3768) Clustering Coefficient
Greg Hogan created FLINK-3768: - Summary: Clustering Coefficient Key: FLINK-3768 URL: https://issues.apache.org/jira/browse/FLINK-3768 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan The local clustering coefficient measures the connectedness of each vertex's neighborhood. Values range from 0.0 (no edges between neighbors) to 1.0 (neighborhood is a clique). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3770) Fix TriangleEnumerator performance
Greg Hogan created FLINK-3770: - Summary: Fix TriangleEnumerator performance Key: FLINK-3770 URL: https://issues.apache.org/jira/browse/FLINK-3770 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 Triangle enumeration is optimized by grouping edges by the vertex with lesser degree. Currently {{TriangleEnumerator}} thinks about this and then ignores the reordered edges. Also, since it is known that the vertex list will be much smaller than the edge list we can provide a {{JoinHint}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3771) Methods for translating Graphs
Greg Hogan created FLINK-3771: - Summary: Methods for translating Graphs Key: FLINK-3771 URL: https://issues.apache.org/jira/browse/FLINK-3771 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 Provide methods for translation of the type or value of graph labels, vertex values, and edge values. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3772) Graph algorithms for vertex and edge degree
Greg Hogan created FLINK-3772: - Summary: Graph algorithms for vertex and edge degree Key: FLINK-3772 URL: https://issues.apache.org/jira/browse/FLINK-3772 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 Many graph algorithms require vertices or edges to be marked with the degree. This ticket provides algorithms for annotating * vertex degree for undirected graphs * vertex out-, in-, and out- and in-degree for directed graphs * edge source, target, and source and target degree for undirected graphs -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3780) Jaccard Similarity
Greg Hogan created FLINK-3780: - Summary: Jaccard Similarity Key: FLINK-3780 URL: https://issues.apache.org/jira/browse/FLINK-3780 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Implement a Jaccard Similarity algorithm computing all non-zero similarity scores. This algorithm is similar to {{TriangleListing}} but instead of joining two-paths against an edge list we count two-paths. {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which relies on {{Graph.getTriplets()}} so only computes similarity scores for neighbors but not neighbors-of-neighbors. This algorithm is easily modified for other similarity scores such as Adamic-Adar similarity where the sum of endpoint degrees is replaced by the degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3789) Overload methods which trigger program execution to allow naming job
Greg Hogan created FLINK-3789: - Summary: Overload methods which trigger program execution to allow naming job Key: FLINK-3789 URL: https://issues.apache.org/jira/browse/FLINK-3789 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Overload the following functions to additionally accept a job name to pass to {{ExecutionEnvironment.execute(String)}}. * {{DataSet.collect()}} * {{DataSet.count()}} * {{DataSetUtils.checksumHashCode(DataSet)}} * {{GraphUtils.checksumHashCode(Graph)}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3799) Graph checksum should execute single job
Greg Hogan created FLINK-3799: - Summary: Graph checksum should execute single job Key: FLINK-3799 URL: https://issues.apache.org/jira/browse/FLINK-3799 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan {{GraphUtils.checksumHashCode()}} calls {{DataSetUtils.checksumHashCode()}} for both the vertex and edge {{DataSet}} which each require a separate job. Rewrite this to only execute a single job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3805) BloomFilter initialized with no memory available
Greg Hogan created FLINK-3805: - Summary: BloomFilter initialized with no memory available Key: FLINK-3805 URL: https://issues.apache.org/jira/browse/FLINK-3805 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 1.0.1, 1.1.0 Reporter: Greg Hogan Priority: Critical I flagged this as 1.1.0 and 1.0.1 without checking the latter. Link to build, command, and stacktrace follow. {{MutableHashTable.initTable}} is calling {{initBloomFilter}} when {{this.availableMemory.size()==0}}. https://s3.amazonaws.com/apache-flink/flink_bloomfilter_crash.tar.bz2 ./bin/flink run -class org.apache.flink.graph.examples.TriangleListing ~/flink-gelly-examples_2.10-1.1-SNAPSHOT.jar --clip_and_flip false --output print --output print --scale 14 --count {code} org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898) at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) at org.apache.flink.graph.examples.TriangleListing.main(TriangleListing.java:106) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:805) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: expectedEntries should be > 0 at org.apache.flink.shaded.com.google.common.base.Preconditions.checkArgument(Preconditions.java:88) at org.apache.flink.runtime.operators.util.BloomFilter.(BloomFilter.java:53) at org.apache.flink.runtime.operators.hash.MutableHashTable.initBloomFilter(MutableHashTable.java:823) at org.apache.flink.runtime.operators.hash.MutableHashTable.initTable(MutableHashTable.java:1183) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:887) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:116) at org.apache.flink.runtime
[jira] [Created] (FLINK-3806) Revert use of DataSet.count() in Gelly
Greg Hogan created FLINK-3806: - Summary: Revert use of DataSet.count() in Gelly Key: FLINK-3806 URL: https://issues.apache.org/jira/browse/FLINK-3806 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Priority: Critical FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The former returns a {{DataSet}} while the latter executes a job to return a Java value. {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the user does not pass the number of vertices as a parameter. As noted in FLINK-1632, this does make the code simpler but if my understanding is correct will materialize the Graph twice. The Graph will need to be reread from input, regenerated, or recomputed by preceding algorithms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3822) Document checksum functions
Greg Hogan created FLINK-3822: - Summary: Document checksum functions Key: FLINK-3822 URL: https://issues.apache.org/jira/browse/FLINK-3822 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Add site documentation for {{GraphUtils.checksumHashCode}}. Note potential issues with {{hashCode}} methods which are linearly combinable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3845) Gelly allows duplicate vertices in Graph.addVertices
Greg Hogan created FLINK-3845: - Summary: Gelly allows duplicate vertices in Graph.addVertices Key: FLINK-3845 URL: https://issues.apache.org/jira/browse/FLINK-3845 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Gelly performs a {{DataSet}} union then calls {{distinct()}} which keeps vertices with the same label but different values. This should be replaced with one of the join operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3846) Graph.removeEdges also removes duplicate edges
Greg Hogan created FLINK-3846: - Summary: Graph.removeEdges also removes duplicate edges Key: FLINK-3846 URL: https://issues.apache.org/jira/browse/FLINK-3846 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan {{EdgeRemovalCoGroup}} should loop over and output the original edge set when no matching edge is found in the removal set. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3853) Reduce object creation in Gelly utility mappers
Greg Hogan created FLINK-3853: - Summary: Reduce object creation in Gelly utility mappers Key: FLINK-3853 URL: https://issues.apache.org/jira/browse/FLINK-3853 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 Gelly contains a set of {{MapFunction}} between {{Vertex}} and {{Tuple2}} and between {{Edge}} and {{Tuple3}}. A {{Vertex}} is a {{Tuple2}} and an {{Edge}} is a {{Tuple3}}, and conversion in the opposite direction can be performed with a single object per {{MapFunction}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3865) ExecutionConfig NullPointerException with second execution
Greg Hogan created FLINK-3865: - Summary: ExecutionConfig NullPointerException with second execution Key: FLINK-3865 URL: https://issues.apache.org/jira/browse/FLINK-3865 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.1.0 Reporter: Greg Hogan Priority: Blocker Following {{NullPointerException}} from pr1956 rebased to master. After the first execution (the program calls {{DataSet.count()}}) the call to {{ExecutionConfig.serializeUserCode}} sets {{registeredKryoTypes}} and other fields to null. During the second execution (creating the actual result) access to this field throws a {{NullPointerException}}. [~till.rohrmann] should {{serializeUserCode}} set the fields to a new {{LinkedHashSet}} and leave {{globalJobParameters}} unchanged? {code} org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238) Caused by: java.lang.NullPointerException at org.apache.flink.api.common.ExecutionConfig.registerKryoType(ExecutionConfig.java:625) at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.recursivelyRegisterType(Serializers.java:96) at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.recursivelyRegisterType(Serializers.java:66) at org.apache.flink.api.java.ExecutionEnvironment$1.preVisit(ExecutionEnvironment.java:1053) at org.apache.flink.api.java.ExecutionEnvironment$1.preVisit(ExecutionEnvironment.java:1046) at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:198) at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199) at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199) at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199) at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199) at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:220) at org.apache.flink.api.common.Plan.accept(Plan.java:333) at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1046) at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1004) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:58) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898) at org.apache.flink.api.java.utils.DataSetUtils.checksumHashCode(DataSetUtils.java:350) at org.apache.flink.graph.examples.HITS.main(HITS.java:114) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) ... 6 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3868) Specialized CopyableValue serializers and comparators
Greg Hogan created FLINK-3868: - Summary: Specialized CopyableValue serializers and comparators Key: FLINK-3868 URL: https://issues.apache.org/jira/browse/FLINK-3868 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan This need was discussed on the mailing list [1] and will be obviated by code generation for POJO serializers and comparators [2] (as I understand, i.e., {{LongValue}} will now simply be treated as a POJO which happens to contain a {{long}} and a specialized serializer and comparator will be generated). In the meantime, and since {{ValueTypeInfo}} will need to be reworked to use the new generators, I think it is worthwhile to add specialized serializers and comparators for the {{CopyableValue}} types. This will also provide another point of comparison for the performance of the generated serializers and comparators. [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html [2] https://issues.apache.org/jira/browse/FLINK-3599 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3877) Create TranslateFunction interface for Graph translators
Greg Hogan created FLINK-3877: - Summary: Create TranslateFunction interface for Graph translators Key: FLINK-3877 URL: https://issues.apache.org/jira/browse/FLINK-3877 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 I now recall why FLINK-3771 had a {{Translator}} interface with a {{translate}} method taking a field for reuse: when we translate edge ID the translator must be called twice. {{TranslateFunction}} will be modeled after {{MapFunction}} and {{RichTranslateFunction}} will be modeled after {{RichMapFunction}}. The unit test should have caught this but I was reusing values between fields which did not detect that values were overwritten. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3879) Native implementation of HITS algorithm
Greg Hogan created FLINK-3879: - Summary: Native implementation of HITS algorithm Key: FLINK-3879 URL: https://issues.apache.org/jira/browse/FLINK-3879 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 Hyperlink-Induced Topic Search (HITS, also "hubs and authorities") is presented in [0] and described in [1]. [0] http://www.cs.cornell.edu/home/kleinber/auth.pdf [1] https://en.wikipedia.org/wiki/HITS_algorithm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3898) Adamic-Adar Similarity
Greg Hogan created FLINK-3898: - Summary: Adamic-Adar Similarity Key: FLINK-3898 URL: https://issues.apache.org/jira/browse/FLINK-3898 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan The implementation of Adamic-Adar Similarity [0] is very close to Jaccard Similarity. Whereas Jaccard Similarity counts common neighbors, Adamic-Adar Similarity sums the inverse logarithm of the degree of common neighbors. Consideration will be given to the computation of the inverse logarithm, in particular whether to pre-compute a small array of values. [0] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3906) Global Clustering Coefficient
Greg Hogan created FLINK-3906: - Summary: Global Clustering Coefficient Key: FLINK-3906 URL: https://issues.apache.org/jira/browse/FLINK-3906 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan The global clustering coefficient measures the ratio of triplets in a graph which are closed (form a triangle). Scores range from 0.0 (no triangles) to 1.0 (complete graph). As part of this ticket the inefficient `GSATriangleCount` will be removed and a new analytic will be added. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3907) Directed Clustering Coefficient
Greg Hogan created FLINK-3907: - Summary: Directed Clustering Coefficient Key: FLINK-3907 URL: https://issues.apache.org/jira/browse/FLINK-3907 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan A directed clustering coefficient algorithm can be implemented using an efficient triangle listing implementation which emits not only the three vertex IDs forming the triangle but also a bitmask indicating which edges form the triangle. A triangle can be formed with a minimum of three or maximum of six directed edges. Directed clustering coefficient can then shatter the triangles and emit a score of either 1 or 2 for each vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3910) New self-join operator
Greg Hogan created FLINK-3910: - Summary: New self-join operator Key: FLINK-3910 URL: https://issues.apache.org/jira/browse/FLINK-3910 Project: Flink Issue Type: New Feature Components: DataSet API, Java API, Scala API Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Flink currently provides inner- and outer-joins as well as cogroup and the non-keyed cross. {{JoinOperator}} hints at future support for semi- and anti-joins. Many Gelly algorithms perform a self-join [0]. Still pending reviews, FLINK-3768 performs a self-join on non-skewed data in TriangleListing.java and FLINK-3780 performs a self-join on skewed data in JaccardSimilarity.java. A {{SelfJoinHint}} will select between skewed and non-skewed implementations. The object-reuse-disabled case can be simply handled with a new {{Operator}}. The object-reuse-enabled case requires either {{CopyableValue}} types (as in the code above) or a custom driver which has access to the serializer (or making the serializer accessible to rich functions, and I think there be dragons). If the idea of a self-join is agreeable, I'd like to work out a rough implementation and go from there. [0] https://en.wikipedia.org/wiki/Join_%28SQL%29#Self-join -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3925) GraphAlgorithm to filter by maximum degree
Greg Hogan created FLINK-3925: - Summary: GraphAlgorithm to filter by maximum degree Key: FLINK-3925 URL: https://issues.apache.org/jira/browse/FLINK-3925 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Filtering by minimum degree is K-Core which is iterative. Filtering by maximum degree can be performed in constant time by filtering the set of high-degree vertices then doing an anti-join against the original vertex set and two anti-joins against the original edge set. Two reasons to remove high-degree vertices: 1) they may simply be noise in the input data, and 2) speedup algorithms such as Adamic-Adar and Jaccard Index which run quadratic in the vertex degree. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3945) Degree annotation for directed graphs
Greg Hogan created FLINK-3945: - Summary: Degree annotation for directed graphs Key: FLINK-3945 URL: https://issues.apache.org/jira/browse/FLINK-3945 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 There is a third degree count for vertices in directed graphs which is the distinct count of out- and in-neighbors. This also adds edge annotation of the vertex degrees for directed graphs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3965) Delegating GraphAlgorithm
Greg Hogan created FLINK-3965: - Summary: Delegating GraphAlgorithm Key: FLINK-3965 URL: https://issues.apache.org/jira/browse/FLINK-3965 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 Complex and related algorithms often overlap in computation of data. Two such examples are: 1) the local and global clustering coefficients each use a listing of triangles 2) the local clustering coefficient joins on vertex degree, and the underlying triangle listing annotates edge degree which uses vertex degree We can reuse and rewrite algorithm output by creating a {{ProxyObject}} as a delegate for method calls to the {{DataSet}} returned by the algorithm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3978) Add contains methods to RuntimeContext
Greg Hogan created FLINK-3978: - Summary: Add contains methods to RuntimeContext Key: FLINK-3978 URL: https://issues.apache.org/jira/browse/FLINK-3978 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor The javadocs for `RuntimeContext` state that `getAccumulator` "throws an exception if the accumulator does not exist or if the accumulator exists, but with different type", although `AbstractRuntimeUDFContext` does not throw an exception but will return null. The javadocs for `getBroadcastVariable` do not mention throwing an exception. Currently the only way to handle a broadcast variable that that may or may not exist is to catch and ignore the exception. Adding a `containsBroadcastVariable` method to `RuntimeContext` would make this explicit. Likewise, `containsAccumulator`. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3980) Remove ExecutionConfig.PARALLELISM_UNKNOWN
Greg Hogan created FLINK-3980: - Summary: Remove ExecutionConfig.PARALLELISM_UNKNOWN Key: FLINK-3980 URL: https://issues.apache.org/jira/browse/FLINK-3980 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 FLINK-3589 added {{ExecutionConfig.PARALLELISM_DEFAULT}} and {{ExecutionConfig.PARALLELISM_UNKNOWN}}. The former gave a name to the contant {{-1}} and the latter was used as a default no-op when setting the parallelism. It's nice to keep these intents separate but given the current implementation of Operator parallelism users can get by using {{PARALLELISM_DEFAULT}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3997) PRNG Skip-ahead
Greg Hogan created FLINK-3997: - Summary: PRNG Skip-ahead Key: FLINK-3997 URL: https://issues.apache.org/jira/browse/FLINK-3997 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan The current sources of randomness for Gelly Graph Generators use fixed-size blocks of work which include an initial seed. There are two issues with this approach. First, the size of the collection of blocks can exceed the Akka limit and cause the job to silently fail. Second, as the block seeds are randomly chosen, the likelihood of blocks overlapping and producing the same sequence increases with the size of the graph. The random generators will be reimplemented using {{SplittableIterator}} and PRNGs supporting skip-ahead. This ticket will implement skip-ahead with LCGs [0]. Future work may add support for xorshift generators ([1], section 5 "Jumping Ahead"). [0] https://mit-crpg.github.io/openmc/methods/random_numbers.html#skip-ahead-capability [1] https://arxiv.org/pdf/1404.0390.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4003) Use intrinsics for MathUtils logarithms
Greg Hogan created FLINK-4003: - Summary: Use intrinsics for MathUtils logarithms Key: FLINK-4003 URL: https://issues.apache.org/jira/browse/FLINK-4003 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Fix For: 1.1.0 {{MathUtils.log2floor}} and {{MathUtils.log2strict}} use naive loops which have efficient implementations in {{Integer}} that are commonly implemented as intrinsics [0]. [0]: http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/classfile/vmSymbols.hpp#l680 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4013) GraphAlgorithms to simplify directed and undirected graphs
Greg Hogan created FLINK-4013: - Summary: GraphAlgorithms to simplify directed and undirected graphs Key: FLINK-4013 URL: https://issues.apache.org/jira/browse/FLINK-4013 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 Create a directed {{GraphAlgorithm}} to remove self-loops and duplicate edges and an undirected {{GraphAlgorithm}} to symmetrize and remove self-loops and duplicate edges. Remove {{RMatGraph.setSimpleGraph}} and the associated logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4102) Test failure with checkpoint barriers
Greg Hogan created FLINK-4102: - Summary: Test failure with checkpoint barriers Key: FLINK-4102 URL: https://issues.apache.org/jira/browse/FLINK-4102 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.1.0 Reporter: Greg Hogan Priority: Critical Fix For: 1.1.0 Failed tests: TwoInputStreamTaskTest.testCheckpointBarriers:229 Output was not correct.: array lengths differed, expected.length=4 actual.length=5 This test was run on Travis (https://travis-ci.org/greghogan/flink/jobs/139481041), and I restarted this task not expecting Travis to reuse IDs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4104) Restructure Gelly docs
Greg Hogan created FLINK-4104: - Summary: Restructure Gelly docs Key: FLINK-4104 URL: https://issues.apache.org/jira/browse/FLINK-4104 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 The Gelly documentation has grown sufficiently long to suggest dividing into sub-pages. Leave "Using Gelly" on the main page and link to the following topics as sub-pages: * Graph API * Iterative Graph Processing * Library Methods * Graph Algorithms * Graph Generators -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4105) Restructure Gelly docs
Greg Hogan created FLINK-4105: - Summary: Restructure Gelly docs Key: FLINK-4105 URL: https://issues.apache.org/jira/browse/FLINK-4105 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 The Gelly documentation has grown sufficiently long to suggest dividing into sub-pages. Leave "Using Gelly" on the main page and link to the following topics as sub-pages: * Graph API * Iterative Graph Processing * Library Methods * Graph Algorithms * Graph Generators -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4106) Restructure Gelly docs
Greg Hogan created FLINK-4106: - Summary: Restructure Gelly docs Key: FLINK-4106 URL: https://issues.apache.org/jira/browse/FLINK-4106 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 The Gelly documentation has grown sufficiently long to suggest dividing into sub-pages. Leave "Using Gelly" on the main page and link to the following topics as sub-pages: * Graph API * Iterative Graph Processing * Library Methods * Graph Algorithms * Graph Generators -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4107) Restructure Gelly docs
Greg Hogan created FLINK-4107: - Summary: Restructure Gelly docs Key: FLINK-4107 URL: https://issues.apache.org/jira/browse/FLINK-4107 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 The Gelly documentation has grown sufficiently long to suggest dividing into sub-pages. Leave "Using Gelly" on the main page and link to the following topics as sub-pages: * Graph API * Iterative Graph Processing * Library Methods * Graph Algorithms * Graph Generators -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4113) Always copy first value in ChainedAllReduceDriver
Greg Hogan created FLINK-4113: - Summary: Always copy first value in ChainedAllReduceDriver Key: FLINK-4113 URL: https://issues.apache.org/jira/browse/FLINK-4113 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.1.0, 1.0.4 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Critical Fix For: 1.1.0, 1.0.4 {{ChainedAllReduceDriver.collect}} must copy the first record even when object reuse is enabled or {{base}} may later point to the same object as {{record}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4117) Wait for CuratorFramework connection to be established
Greg Hogan created FLINK-4117: - Summary: Wait for CuratorFramework connection to be established Key: FLINK-4117 URL: https://issues.apache.org/jira/browse/FLINK-4117 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.1.0 Reporter: Greg Hogan Received the following error when locally running {{mvn verify}}. Searching on the error it looks like we are not waiting for the Zookeeper connection to be established as this occurs asynchronously. In ZookeeperUtils.java:98 we call {{CuratorFramework.start()}} and we could then call CuratorFramework.blockUntilConnected}} with the same timeout. {code} Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 323.326 sec <<< FAILURE! - in org.apache.flink.runtime.checkpoint.CheckpointIDCounterTest$ZooKeeperCheckpointIDCounterITCase testConcurrentGetAndIncrement(org.apache.flink.runtime.checkpoint.CheckpointIDCounterTest$ZooKeeperCheckpointIDCounterITCase) Time elapsed: 266.521 sec <<< ERROR! java.util.concurrent.ExecutionException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /flink/checkpoint-id-counter at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.checkpoint.CheckpointIDCounterTest.testConcurrentGetAndIncrement(CheckpointIDCounterTest.java:129) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) Caused by: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /flink/checkpoint-id-counter at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155) at org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:302) at org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:291) at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) at org.apache.curator.framework.imps.GetDataBuilderImpl.pathInForeground(GetDataBuilderImpl.java:288) at org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:279) at org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:41) at org.apache.curator.framework.recipes.sha
[jira] [Created] (FLINK-4129) HITSAlgorithm should test for element-wise convergence
Greg Hogan created FLINK-4129: - Summary: HITSAlgorithm should test for element-wise convergence Key: FLINK-4129 URL: https://issues.apache.org/jira/browse/FLINK-4129 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Priority: Minor {{HITSAlgorithm}} tests for convergence by summing the difference of each authority score minus the average score. This is simply comparing the sum of scores against the previous sum of scores which is not a good test for convergence. {code} // count the diff value of sum of authority scores diffSumAggregator.aggregate(previousAuthAverage - newAuthorityValue.getValue()); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4132) Fix boxed comparison in CommunityDetection algorithm
Greg Hogan created FLINK-4132: - Summary: Fix boxed comparison in CommunityDetection algorithm Key: FLINK-4132 URL: https://issues.apache.org/jira/browse/FLINK-4132 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 IntelliJ notes that testing boxed primitives for equality will compare pointer values. We should be using primitives types. {code} if (maxScoreLabel != vertex.getValue().f0) { {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4135) Replace ChecksumHashCode as GraphAnalytic
Greg Hogan created FLINK-4135: - Summary: Replace ChecksumHashCode as GraphAnalytic Key: FLINK-4135 URL: https://issues.apache.org/jira/browse/FLINK-4135 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Fix For: 1.1.0 Create a {{GraphAnalytic}} to replace {{GraphUtils.checksumHashCode}} as there is nothing special about this computation and we can remove this function from the API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4172) Don't proxy a ProxiedObject
Greg Hogan created FLINK-4172: - Summary: Don't proxy a ProxiedObject Key: FLINK-4172 URL: https://issues.apache.org/jira/browse/FLINK-4172 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 Some graph algorithms pass through a DataSet unmodified (at least, until we have VertexSet and EdgeSet). We need to prevent a DataSet from being proxied twice. Allowing two methods to own a single object sounds brittle, so we can instead provide access to the original DataSet which can be wrapped in a new proxy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4212) Lock on pid file when starting daemons
Greg Hogan created FLINK-4212: - Summary: Lock on pid file when starting daemons Key: FLINK-4212 URL: https://issues.apache.org/jira/browse/FLINK-4212 Project: Flink Issue Type: Improvement Components: Startup Shell Scripts Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan As noted on the mailing list (0), when multiple TaskManagers are started in parallel (using pdsh) there is a race condition on updating the pid: 1) the pid file is first read to parse the process' index, 2) the process is started, and 3) on success the daemon pid is appended to the pid file. We could use a tool such as {{flock}} to lock on the pid file while starting the Flink daemon. 0: http://mail-archives.apache.org/mod_mbox/flink-user/201607.mbox/%3CCA%2BssbKXw954Bz_sBRwP6db0FntWyGWzTyP7wJZ5nhOeQnof3kg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4213) Provide CombineHint in Gelly algorithms
Greg Hogan created FLINK-4213: - Summary: Provide CombineHint in Gelly algorithms Key: FLINK-4213 URL: https://issues.apache.org/jira/browse/FLINK-4213 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Many graph algorithms will see better {{reduce}} performance with the hash-combine compared with the still default sort-combine, e.g. HITS and LocalClusteringCoefficient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4217) Gelly drivers should read CSV values as strings
Greg Hogan created FLINK-4217: - Summary: Gelly drivers should read CSV values as strings Key: FLINK-4217 URL: https://issues.apache.org/jira/browse/FLINK-4217 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Gelly drivers ClusteringCoefficient, HITS, JaccardIndex, and TriangleListing parse CSV files as {{LongValue}}. This works for anonymized data sets such as SNAP but should be configurable as {{StringValue}} to handle the general case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4219) Quote PDSH opts in start-cluster.sh
Greg Hogan created FLINK-4219: - Summary: Quote PDSH opts in start-cluster.sh Key: FLINK-4219 URL: https://issues.apache.org/jira/browse/FLINK-4219 Project: Flink Issue Type: Bug Components: Startup Shell Scripts Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 Quote {{PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS}} in {{start-cluster.sh}} to prevent word splitting if the user configures multiple SSH options. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4231) Switch DistinctOperator from GroupReduceFunction to ReduceFunction
Greg Hogan created FLINK-4231: - Summary: Switch DistinctOperator from GroupReduceFunction to ReduceFunction Key: FLINK-4231 URL: https://issues.apache.org/jira/browse/FLINK-4231 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan As discussed in FLINK-3279, rewriting {{DistinctOperator}} to a {{ReduceFunction}} rather than the current {{GroupReduceFunction}} allows the user to set the {{CombineHint}} and choose a hash-based, sort-based, or no combiner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4257) Handle delegating algorithm change of class
Greg Hogan created FLINK-4257: - Summary: Handle delegating algorithm change of class Key: FLINK-4257 URL: https://issues.apache.org/jira/browse/FLINK-4257 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 A class created by {{ProxyFactory}} can intercept and reinterpret method calls using its {{MethodHandler}}, but is restricted in that * the type of the proxy class cannot be changed * method return types must be honored We have algorithms such as {{VertexDegree}} and {{TriangleListing}} that change return type depending on configuration, even between single and dual input functions. This can be problematic, e.g. in {{OperatorTranslation}} where we test {{dataSet instanceof SingleInputOperator}} or {{dataSet instanceof TwoInputOperator}}. Even simply changing operator can be problematic, e.g. {{MapOperator.translateToDataFlow}} returns {{MapOperatorBase}} whereas {{ReduceOperator.translateToDataFlow}} returns {{SingleInputOperator}}. I see two ways to solve these issues. By adding a simple {{NoOpOperator}} that is skipped over during {{OperatorTranslation}} we could wrap all algorithm output and always be proxying the same class. Alternatively, making changes only within Gelly we can append a "no-op" pass-through {{MapFunction}} to any algorithm output which is not a {{SingleInputOperator}}. And {{Delegate can also walk the superclass hierarchy such we are always proxying {{SingleInputOperator}}. There is one additional issue. When we call {{DataSet.output}} the delegate's {{MethodHandler}} must reinterpret this call to add itself to the list of sinks. As part of this issue I will also add manual tests to Gelly for the library algorithms which do not have integration tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4264) New GraphMetrics driver
Greg Hogan created FLINK-4264: - Summary: New GraphMetrics driver Key: FLINK-4264 URL: https://issues.apache.org/jira/browse/FLINK-4264 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Gelly contains a {{GraphMetrics}} example using the {{Graph}} API. This will add a {{GraphMetrics}} driver using {{VertexMetrics}} and {{EdgeMetrics}}. Example output for the [Friendster|https://snap.stanford.edu/data/com-Friendster.html] graph: {noformat} Vertex metrics: vertex count: 65,608,366 edge count: 1,806,067,135 triplet count: 720,654,964,964 maximum degree: 5,214 maximum triplets: 13,590,291 Edge metrics: vertex count: 65,608,366 edge count: 1,806,067,135 triangle triplet count: 82,248,653,258 rectangle triplet count: 284,581,272,439 triplet count: 720,654,964,964 maximum degree: 5,214 maximum triangle triplets: 376,278 maximum rectangle triplets: 1,203,750 maximum triplets: 13,590,291 {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4265) Add a NoOpOperator
Greg Hogan created FLINK-4265: - Summary: Add a NoOpOperator Key: FLINK-4265 URL: https://issues.apache.org/jira/browse/FLINK-4265 Project: Flink Issue Type: New Feature Components: DataSet API Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Fix For: 1.1.0 One recent feature of Gelly is algorithms which detect duplicated or similar computation which can be shared. My initial implementation could only reuse a {{DataSet}} result. Before committing to Flink this was updated to use a javassist {{ProxyFactory}} allowing configuration to be merged and results to be replaced. There were some issues, as identified in FLINK-4257. With a {{NoOpOperator}} we can remove the use of {{ProxyFactory}} and resolve the identified issues. This ticket adds a {{NoOpOperator}} which is unwound in {{OperatorTranslation.translate}}. The {{NoOpOperator}} contains a {{DataSet}} which is accessed by a getter and setter. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2262) ParameterTool API misnamed function
Greg Hogan created FLINK-2262: - Summary: ParameterTool API misnamed function Key: FLINK-2262 URL: https://issues.apache.org/jira/browse/FLINK-2262 Project: Flink Issue Type: Bug Components: Java API Affects Versions: 0.9, master Reporter: Greg Hogan In org.apache.flink.api.java.utils.ParameterTool the function to read an Integer with a default value is named "getLong". public int getLong(String key, int defaultValue) { ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2545) NegativeArraySizeException while creating hash table bloom filters
Greg Hogan created FLINK-2545: - Summary: NegativeArraySizeException while creating hash table bloom filters Key: FLINK-2545 URL: https://issues.apache.org/jira/browse/FLINK-2545 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: master Reporter: Greg Hogan The following exception occurred a second time when I immediately re-ran my application, though after recompiling and restarting Flink the subsequent execution ran without error. java.lang.Exception: The data preparation for task '...' , caused an error: null at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:465) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NegativeArraySizeException at org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucket(MutableHashTable.java:1160) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucketsInPartition(MutableHashTable.java:1143) at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1117) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:946) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:868) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:692) at org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:455) at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator.open(ReusingBuildSecondHashMatchIterator.java:93) at org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:195) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:459) ... 3 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2569) CsvReader support for ValueTypes
Greg Hogan created FLINK-2569: - Summary: CsvReader support for ValueTypes Key: FLINK-2569 URL: https://issues.apache.org/jira/browse/FLINK-2569 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 0.9 Reporter: Greg Hogan Priority: Minor >From the Flink Programming Guide section on Data Sources: "readCsvFile(path) / >CsvInputFormat - Parses files of comma (or another char) delimited fields. >Returns a DataSet of tuples or POJOs. Supports the basic java types and their >Value counterparts as field types." When specifying a ValueType, i.e. CsvReader csvReader = env.readCsvFile(filename); csvReader.types(IntValue.class, IntValue.class); the following error occurs as BasicTypeInfo is specifically requested in CsvReader.types(...). org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:327) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:608) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:296) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:927) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:977) Caused by: java.lang.IllegalArgumentException: Type at position 0 is not a basic type. at org.apache.flink.api.java.typeutils.TupleTypeInfo.getBasicTupleTypeInfo(TupleTypeInfo.java:177) at org.apache.flink.api.java.io.CsvReader.types(CsvReader.java:393) at Driver.main(Driver.java:105) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) ... 6 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2603) Flink hangs before starting execution
Greg Hogan created FLINK-2603: - Summary: Flink hangs before starting execution Key: FLINK-2603 URL: https://issues.apache.org/jira/browse/FLINK-2603 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: master, 0.9.1 Reporter: Greg Hogan The following simple application created from Flink's Java quickstart will hang before execution if the data size is to large. I have tested this on both 0.9.1 and master (7364ce18) and the threshold occurs with $ ./bin/flink run /path/to/blocks/target/blocks-1.0-SNAPSHOT.jar 19 running in a few seconds while $ ./bin/flink run /path/to/blocks/target/blocks-1.0-SNAPSHOT.jar 20 seemingly hangs forever. I first put this together two months ago so if it is a bug it is not a new bug. {code} package blocks; import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; public class Job { public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); List data = new ArrayList<>(); int count = 1 << Integer.valueOf(args[0]); for (int i = 0 ; i < count ; i++) { data.add(0L); } env.fromCollection(data).output(new DiscardingOutputFormat()); System.out.println("Ready to execute ..."); // execute program env.execute("Flink Java API Skeleton"); } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2625) Configuration not passed to JobGraphGenerator
Greg Hogan created FLINK-2625: - Summary: Configuration not passed to JobGraphGenerator Key: FLINK-2625 URL: https://issues.apache.org/jira/browse/FLINK-2625 Project: Flink Issue Type: Bug Components: Distributed Runtime, Local Runtime Affects Versions: master Reporter: Greg Hogan Priority: Minor LocalExecutor and Client fail to pass their Configuration to JobGraphGenerator, so the following parameters were ignored: taskmanager.runtime.max-fan taskmanager.runtime.sort-spilling-threshold -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2653) Enable object reuse in MergeIterator
Greg Hogan created FLINK-2653: - Summary: Enable object reuse in MergeIterator Key: FLINK-2653 URL: https://issues.apache.org/jira/browse/FLINK-2653 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: master Reporter: Greg Hogan MergeIterator currently discards given reusable objects and simply returns a new object from the JVM heap. This inefficiency has a noticeable impact on garbage collection and runtime overhead (~5% overall performance by my measure). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2655) Minimize intermediate merging of spilled buffers
Greg Hogan created FLINK-2655: - Summary: Minimize intermediate merging of spilled buffers Key: FLINK-2655 URL: https://issues.apache.org/jira/browse/FLINK-2655 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: master Reporter: Greg Hogan If the number of spilled buffers exceeds taskmanager.runtime.max-fan then the number of files must reduced with an intermediate merge by reading, merging, and spilling into a single, larger file. The current implementation performs an intermediate merge on all files. An optimal implementation minimizes the amount of merged data by performing partial merges first. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2659) Object reuse in UnionWithTempOperator
Greg Hogan created FLINK-2659: - Summary: Object reuse in UnionWithTempOperator Key: FLINK-2659 URL: https://issues.apache.org/jira/browse/FLINK-2659 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: master Reporter: Greg Hogan The first loop in UnionWithTempOperator.run() executes until null, then the second loop attempts to reuse this null value. [~StephanEwen], would you like me to submit a pull request? Stack trace: {noformat} org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.run(Client.java:381) at org.apache.flink.client.program.Client.run(Client.java:319) at org.apache.flink.client.program.Client.run(Client.java:312) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:790) at Driver.main(Driver.java:376) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:278) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:630) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:318) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:953) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1003) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:418) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:40) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NullPointerException at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
[jira] [Created] (FLINK-2668) ProjectOperator method to close projection
Greg Hogan created FLINK-2668: - Summary: ProjectOperator method to close projection Key: FLINK-2668 URL: https://issues.apache.org/jira/browse/FLINK-2668 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: master Reporter: Greg Hogan Priority: Minor I have come across an issue in my code where I called {noformat}project(...){noformat} on a DataSet which was already a {noformat}ProjectOperator{noformat}. Instead of reducing the number of fields from 2 to 1 this instead increased the number of fields from 2 to 3 resulting in {noformat}org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Tuple arity '3' expected but was '1'.{noformat} when processing the next operator. This can be resolved by adding an optional explicit call to conclude the projection, perhaps {noformat}ProjectOperator.closeProjection(){noformat}. Can this be done without creating a new no-op operator? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2685) TaskManager deadlock on NetworkBufferPool
Greg Hogan created FLINK-2685: - Summary: TaskManager deadlock on NetworkBufferPool Key: FLINK-2685 URL: https://issues.apache.org/jira/browse/FLINK-2685 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: master Reporter: Greg Hogan This deadlock occurs intermittently. I have a {join} followed by a {chain} followed by a {reduceGroup}. Stack traces and local variables from one each of the {join} threads below. The {join}s are waiting on a buffer to become available ({networkBufferPool.availableMemorySegments.count=0}). Both {LocalBufferPool}s have been given extra capacity ({currentPoolSize=60 > numberOfRequiredMemorySegments=32}). The first {join} is at full capacity ({currentPoolSize=numberOfRequestedMemorySegments=60}) yet the second {join} has not acquired any ({numberOfRequestedMemorySegments=0}). {LocalBufferPool.returnExcessMemorySegments} only recycles {MemorySegment}s from its {availableMemorySegments}, so any requested {Buffer}s will only be released when explicitly recycled. First join stack trace and variable values from {LocalBufferPool.requestBuffer}: {noformat} owns: SpanningRecordSerializer (id=723) waiting for: ArrayDeque (id=724) Object.wait(long) line: not available [native method] LocalBufferPool.requestBuffer(boolean) line: 163 LocalBufferPool.requestBufferBlocking() line: 133 RecordWriter.emit(T) line: 92 OutputCollector.collect(T) line: 65 JoinOperator$ProjectFlatJoinFunction.join(T1, T2, Collector) line: 1088 ReusingBuildSecondHashMatchIterator.callWithNextKey(FlatJoinFunction, Collector) line: 137 JoinDriver.run() line: 208 RegularPactTask.run() line: 489 RegularPactTask.invoke() line: 354 Task.run() line: 581 Thread.run() line: 745 {noformat} {noformat} thisLocalBufferPool (id=403) availableMemorySegments ArrayDeque (id=398) elementsObject[16] (id=422) head14 tail14 currentPoolSize 60 isDestroyed false networkBufferPool NetworkBufferPool (id=354) allBufferPools HashSet (id=424) availableMemorySegments ArrayBlockingQueue (id=427) count 0 items Object[10240] (id=674) itrsnull lockReentrantLock (id=675) notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678) notFull AbstractQueuedSynchronizer$ConditionObject (id=679) putIndex6954 takeIndex 6954 factoryLock Object (id=430) isDestroyed false managedBufferPools HashSet (id=431) memorySegmentSize 32768 numTotalRequiredBuffers 3226 totalNumberOfMemorySegments 10240 numberOfRequestedMemorySegments 60 numberOfRequiredMemorySegments 32 owner null registeredListeners ArrayDeque (id=421) elementsObject[16] (id=685) head0 tail0 askToRecyclefalse isBlocking true {noformat} Second join stack trace and variable values from {SingleInputGate.getNextBufferOrEvent}: {noformat} Unsafe.park(boolean, long) line: not available [native method] LockSupport.parkNanos(Object, long) line: 215 AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078 LinkedBlockingQueue.poll(long, TimeUnit) line: 467 SingleInputGate.getNextBufferOrEvent() line: 414 MutableRecordReader(AbstractRecordReader).getNextRecord(T) line: 79 MutableRecordReader.next(T) line: 34 ReaderIterator.next(T) line: 59 MutableHashTable$ProbeIterator.next() line: 1581 MutableHashTable.processProbeIter() line: 457 MutableHashTable.nextRecord() line: 555 ReusingBuildSecondHashMatchIterator.callWithNextKey(FlatJoinFunction, Collector) line: 110 JoinDriver.run() line: 208 RegularPactTask.run() line: 489 RegularPactTask.invoke() line: 354 Task.run() line: 581 Thread.run() line: 745 {noformat} {noformat} thisSingleInputGate (id=693) bufferPool LocalBufferPool (id=706) availableMemorySegments ArrayDeque (id=716) elementsObject[16] (id=717) head0 tail0 currentPoolSize 60 isDestroyed false networkBufferPool NetworkBufferPool (id=
[jira] [Created] (FLINK-2698) Add trailing newline to flink-conf.yaml
Greg Hogan created FLINK-2698: - Summary: Add trailing newline to flink-conf.yaml Key: FLINK-2698 URL: https://issues.apache.org/jira/browse/FLINK-2698 Project: Flink Issue Type: Improvement Affects Versions: master Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor The distributed flink-conf.yaml does not contain a trailing newline. This interferes with [bdutil|https://github.com/GoogleCloudPlatform/bdutil/blob/master/extensions/flink/install_flink.sh#L64] which appends extra/override configuration parameters with a heredoc. There are many other files without trailing newlines, but this looks to be the only detrimental effect. {code} for i in $(find * -type f) ; do if diff /dev/null "$i" | tail -1 | grep '^\\ No newline' > /dev/null; then echo $i; fi; done {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2716) Checksum method for DataSet and Graph
Greg Hogan created FLINK-2716: - Summary: Checksum method for DataSet and Graph Key: FLINK-2716 URL: https://issues.apache.org/jira/browse/FLINK-2716 Project: Flink Issue Type: Improvement Components: Gelly, Java API, Scala API Affects Versions: master Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor {{DataSet.count()}}, {{Graph.numberOfVertices()}}, and {{Graph.numberOfEdges()}} provide measures of the number of distributed data elements. New {{DataSet.checksum()}} and {{Graph.checksum()}} methods will summarize the content of data elements and support algorithm validation, integration testing, and benchmarking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)