[jira] [Commented] (FLINK-12595) KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown deadlocks
[ https://issues.apache.org/jira/browse/FLINK-12595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889581#comment-16889581 ] Shannon Carey commented on FLINK-12595: --- Created [https://github.com/apache/flink/pull/9187] > KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown > deadlocks > -- > > Key: FLINK-12595 > URL: https://issues.apache.org/jira/browse/FLINK-12595 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis, Tests >Affects Versions: 1.9.0 >Reporter: Dawid Wysakowicz >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > https://api.travis-ci.org/v3/job/535738122/log.txt -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-12595) KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown deadlocks
[ https://issues.apache.org/jira/browse/FLINK-12595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889564#comment-16889564 ] Shannon Carey edited comment on FLINK-12595 at 7/20/19 6:49 PM: Sorry about that! Had to draw out a big sequence diagram, but I have a hypothesis about the issue. It looks like KinesisDataFetcher (on the FlinkKinesisConsumer thread) could by chance pause inside the while(running) loop, before Thread.sleep(). Then, the KinesisShardConsumer thread might interrupt that thread via KinesisDataFetcher#mainThread.interrupt() and then trigger shutdownWaiter, allowing the tests's fetcher.waitUntilShutdown() to complete, after which the test would also interrupt the FlinkKinesisConsumer thread. Then, when the KinesisDataFetcher code resumes, it calls Thread.sleep() and catches/ignores the interrupted state (caused by both interrupts), then it exits the while(running) loop due to running == false, and gets to our awaitTermination() code which waits forever because it has already absorbed the test's interrupt. This can be reproduced by adding code like this to the KinesisDataFetcher beneath the if(running && discoveryIntervalMillis !=0) line in order to force a longer delay in that thread: {code:java} boolean wasInterrupted = false; int interruptionCount = 0; for (int i = 0; i < 4; i++) { try { Thread.sleep(4000); } catch (InterruptedException ie) { wasInterrupted = true; interruptionCount++; } } if (wasInterrupted) { // Restore the interrupted state Thread.currentThread().interrupt(); } System.out.println("KinesisDataFetcher was interrupted " + interruptionCount + " times during the " + "while(running) loop."); System.out.flush(); {code} You'll likely see that it gets interrupted twice, and the test deadlocks with the same stacks as the logs provided above. I have 02a0cf3d4e checked out to reproduce the issue matching the provided logs. I assume it's best to write this patch against HEAD of master, and let you handle backporting it? Let me know if that's not the case. I'll post again once I have a PR to address my hypothesis. was (Author: rehevkor5): Sorry about that! Had to draw out a big sequence diagram, but I have a hypothesis about the issue. It looks like KinesisDataFetcher (on the FlinkKinesisConsumer thread) could by chance pause inside the while(running) loop, before Thread.sleep(). Then, the KinesisShardConsumer thread might interrupt that thread via KinesisDataFetcher#mainThread.interrupt() and then trigger shutdownWaiter, allowing the tests's fetcher.waitUntilShutdown() to complete, after which the test would also interrupt the FlinkKinesisConsumer thread. Then, when the KinesisDataFetcher code resumes, it calls Thread.sleep() and catches/ignores the interrupted state (caused by both interrupts), then it exits the while(running) loop due to running == false, and gets to our awaitTermination() code which waits forever because it has already absorbed the test's interrupt. This can be reproduced by adding code like this to the KinesisDataFetcher beneath the if(running && discoveryIntervalMillis !=0) line in order to force a longer delay in that thread: {code:java} boolean wasInterrupted = false; int interruptionCount = 0; for (int i = 0; i < 4; i++) { try { Thread.sleep(4000); } catch (InterruptedException ie) { wasInterrupted = true; interruptionCount++; } } if (wasInterrupted) { // Restore the interrupted state Thread.currentThread().interrupt(); } System.out.println("KinesisDataFetcher was interrupted " + interruptionCount + " times during the " + "while(running) loop."); System.out.flush(); {code} You'll likely see that it gets interrupted twice, and the test deadlocks with the same stacks as the logs provided above. I have 02a0cf3d4e checked out to reproduce the issue matching the provided logs. I assume it's best to write this patch against HEAD of master, and let you handle backporting it? Let me know if that's not the case. I'll post again once I have a PR to address my hypothesis. > KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown > deadlocks > -- > > Key: FLINK-12595 > URL: https://issues.apache.org/jira/browse/FLINK-12595 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis, Tests >Affects Versions: 1.9.0 >Reporter: Dawid Wysakowicz >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > > https://api.travis-ci.org/v3/job/535738122/log.txt -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-12595) KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown deadlocks
[ https://issues.apache.org/jira/browse/FLINK-12595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889565#comment-16889565 ] Shannon Carey commented on FLINK-12595: --- Another thing I noticed: FlinkKinesisConsumer#sourceContext.close() is not called if fetcher.runFetcher() throws an exception. This seems like it might be a problem? Should that be moved into a "finally" block? Do you think I should submit a separate issue for that?? > KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown > deadlocks > -- > > Key: FLINK-12595 > URL: https://issues.apache.org/jira/browse/FLINK-12595 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis, Tests >Affects Versions: 1.9.0 >Reporter: Dawid Wysakowicz >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > > https://api.travis-ci.org/v3/job/535738122/log.txt -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-12595) KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown deadlocks
[ https://issues.apache.org/jira/browse/FLINK-12595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889565#comment-16889565 ] Shannon Carey edited comment on FLINK-12595 at 7/20/19 6:26 PM: Another thing I noticed: FlinkKinesisConsumer#sourceContext.close() is not called if fetcher.runFetcher() throws an exception. This seems like it might be a problem? Should that be moved into a "finally" block? Do you think I should submit a separate issue for that? was (Author: rehevkor5): Another thing I noticed: FlinkKinesisConsumer#sourceContext.close() is not called if fetcher.runFetcher() throws an exception. This seems like it might be a problem? Should that be moved into a "finally" block? Do you think I should submit a separate issue for that?? > KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown > deadlocks > -- > > Key: FLINK-12595 > URL: https://issues.apache.org/jira/browse/FLINK-12595 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis, Tests >Affects Versions: 1.9.0 >Reporter: Dawid Wysakowicz >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > > https://api.travis-ci.org/v3/job/535738122/log.txt -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-12595) KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown deadlocks
[ https://issues.apache.org/jira/browse/FLINK-12595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889564#comment-16889564 ] Shannon Carey commented on FLINK-12595: --- Sorry about that! Had to draw out a big sequence diagram, but I have a hypothesis about the issue. It looks like KinesisDataFetcher (on the FlinkKinesisConsumer thread) could by chance pause inside the while(running) loop, before Thread.sleep(). Then, the KinesisShardConsumer thread might interrupt that thread via KinesisDataFetcher#mainThread.interrupt() and then trigger shutdownWaiter, allowing the tests's fetcher.waitUntilShutdown() to complete, after which the test would also interrupt the FlinkKinesisConsumer thread. Then, when the KinesisDataFetcher code resumes, it calls Thread.sleep() and catches/ignores the interrupted state (caused by both interrupts), then it exits the while(running) loop due to running == false, and gets to our awaitTermination() code which waits forever because it has already absorbed the test's interrupt. This can be reproduced by adding code like this to the KinesisDataFetcher beneath the if(running && discoveryIntervalMillis !=0) line in order to force a longer delay in that thread: {code:java} boolean wasInterrupted = false; int interruptionCount = 0; for (int i = 0; i < 4; i++) { try { Thread.sleep(4000); } catch (InterruptedException ie) { wasInterrupted = true; interruptionCount++; } } if (wasInterrupted) { // Restore the interrupted state Thread.currentThread().interrupt(); } System.out.println("KinesisDataFetcher was interrupted " + interruptionCount + " times during the " + "while(running) loop."); System.out.flush(); {code} You'll likely see that it gets interrupted twice, and the test deadlocks with the same stacks as the logs provided above. I have 02a0cf3d4e checked out to reproduce the issue matching the provided logs. I assume it's best to write this patch against HEAD of master, and let you handle backporting it? Let me know if that's not the case. I'll post again once I have a PR to address my hypothesis. > KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown > deadlocks > -- > > Key: FLINK-12595 > URL: https://issues.apache.org/jira/browse/FLINK-12595 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis, Tests >Affects Versions: 1.9.0 >Reporter: Dawid Wysakowicz >Priority: Critical > Labels: test-stability > Fix For: 1.9.0 > > > https://api.travis-ci.org/v3/job/535738122/log.txt -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-11568) Exception in Kinesis ShardConsumer hidden by InterruptedException
Shannon Carey created FLINK-11568: - Summary: Exception in Kinesis ShardConsumer hidden by InterruptedException Key: FLINK-11568 URL: https://issues.apache.org/jira/browse/FLINK-11568 Project: Flink Issue Type: Improvement Components: Kinesis Connector Affects Versions: 1.6.2 Reporter: Shannon Carey Assignee: Shannon Carey When the Kinesis ShardConsumer encounters an exception, for example due to a problem in the Deserializer, the root cause exception is often hidden by a non-informative InterruptedException caused by the FlinkKinesisConsumer thread being interrupted. Ideally, the root cause exception would be preserved and thrown so that the logs contain enough information to diagnose the issue. This probably affects all versions. Here's an example of a log message with the unhelpful InterruptedException: {code:java} 2019-02-05 13:29:31:383 thread=Source: Custom Source -> Filter -> Map -> Sink: Unnamed (1/8), level=WARN, logger=org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer, message="Error while closing Kinesis data fetcher" java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:450) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:314) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:323) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) {code} And here's an example of the real exception that we're actually interested in, which is stored inside KinesisDataFetcher#error, but is not thrown or logged: {code:java} org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416) org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) org.apache.avro.io.parsing.Parser.advance(Parser.java:88) org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178) org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240) org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:135) org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper.deserialize(KinesisDeserializationSchemaWrapper.java:44) org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:332) org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:231) java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) java.util.concurrent.FutureTask.run(FutureTask.java) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-6805) Flink Cassandra connector dependency on Netty disagrees with Flink
[ https://issues.apache.org/jira/browse/FLINK-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shannon Carey updated FLINK-6805: - Description: The Flink Cassandra connector has a dependency on Netty libraries (via promotion of transitive dependencies by the Maven shade plugin) at version 4.0.33.Final, which disagrees with the version included in Flink of 4.0.27.Final which is included & managed by the parent POM via dependency on netty-all. Due to use of netty-all, the dependency management doesn't take effect on the individual libraries such as netty-handler, netty-codec, etc. I suggest that dependency management of Netty should be added for all Netty libraries individually (netty-handler, etc.) so that all Flink modules use the same version, and similarly I suggest that exclusions be added to the quickstart example POM for the individual Netty libraries so that fat JARs don't include conflicting versions of Netty. It seems like this problem started when FLINK-6084 was implemented: transitive dependencies of the flink-connector-cassandra were previously omitted, and now that they are included we must make sure that they agree with the Flink distribution. was: The Flink Cassandra connector has a dependency on Netty libraries (via promotion of transitive dependencies by the Maven shade plugin) at version 4.0.33.Final, which disagrees with the version included in Flink of 4.0.27.Final which is included & managed by the parent POM via dependency on netty-all. Due to use of netty-all, the dependency management doesn't take effect on the individual libraries such as netty-handler, netty-codec, etc. I suggest that dependency management of Netty should be added for all Netty libraries individually (netty-handler, etc.) so that all Flink modules use the same version, and similarly I suggest that exclusions be added to the quickstart example POM for the individual Netty libraries so that fat JARs don't include conflicting versions of Netty. > Flink Cassandra connector dependency on Netty disagrees with Flink > -- > > Key: FLINK-6805 > URL: https://issues.apache.org/jira/browse/FLINK-6805 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Shannon Carey > > The Flink Cassandra connector has a dependency on Netty libraries (via > promotion of transitive dependencies by the Maven shade plugin) at version > 4.0.33.Final, which disagrees with the version included in Flink of > 4.0.27.Final which is included & managed by the parent POM via dependency on > netty-all. > Due to use of netty-all, the dependency management doesn't take effect on the > individual libraries such as netty-handler, netty-codec, etc. > I suggest that dependency management of Netty should be added for all Netty > libraries individually (netty-handler, etc.) so that all Flink modules use > the same version, and similarly I suggest that exclusions be added to the > quickstart example POM for the individual Netty libraries so that fat JARs > don't include conflicting versions of Netty. > It seems like this problem started when FLINK-6084 was implemented: > transitive dependencies of the flink-connector-cassandra were previously > omitted, and now that they are included we must make sure that they agree > with the Flink distribution. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6805) Flink Cassandra connector dependency on Netty disagrees with Flink
[ https://issues.apache.org/jira/browse/FLINK-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16033571#comment-16033571 ] Shannon Carey commented on FLINK-6805: -- A better solution may be to add exclusions for the Netty libraries in the flink-connector-cassandra POM, then the other changes wouldn't necessarily be required (although they might still be desirable). > Flink Cassandra connector dependency on Netty disagrees with Flink > -- > > Key: FLINK-6805 > URL: https://issues.apache.org/jira/browse/FLINK-6805 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Shannon Carey > > The Flink Cassandra connector has a dependency on Netty libraries (via > promotion of transitive dependencies by the Maven shade plugin) at version > 4.0.33.Final, which disagrees with the version included in Flink of > 4.0.27.Final which is included & managed by the parent POM via dependency on > netty-all. > Due to use of netty-all, the dependency management doesn't take effect on the > individual libraries such as netty-handler, netty-codec, etc. > I suggest that dependency management of Netty should be added for all Netty > libraries individually (netty-handler, etc.) so that all Flink modules use > the same version, and similarly I suggest that exclusions be added to the > quickstart example POM for the individual Netty libraries so that fat JARs > don't include conflicting versions of Netty. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6805) Flink Cassandra connector dependency on Netty disagrees with Flink
Shannon Carey created FLINK-6805: Summary: Flink Cassandra connector dependency on Netty disagrees with Flink Key: FLINK-6805 URL: https://issues.apache.org/jira/browse/FLINK-6805 Project: Flink Issue Type: Bug Components: Cassandra Connector Affects Versions: 1.2.1, 1.3.0 Reporter: Shannon Carey The Flink Cassandra connector has a dependency on Netty libraries (via promotion of transitive dependencies by the Maven shade plugin) at version 4.0.33.Final, which disagrees with the version included in Flink of 4.0.27.Final which is included & managed by the parent POM via dependency on netty-all. Due to use of netty-all, the dependency management doesn't take effect on the individual libraries such as netty-handler, netty-codec, etc. I suggest that dependency management of Netty should be added for all Netty libraries individually (netty-handler, etc.) so that all Flink modules use the same version, and similarly I suggest that exclusions be added to the quickstart example POM for the individual Netty libraries so that fat JARs don't include conflicting versions of Netty. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5690) protobuf is not shaded properly
[ https://issues.apache.org/jira/browse/FLINK-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16001826#comment-16001826 ] Shannon Carey commented on FLINK-5690: -- Regarding the comment: {quote} Mh, that's weird. According to this answer (http://stackoverflow.com/questions/7076414/java-lang-illegalaccesserror-tried-to-access-method) the IllegalAccessError is caused by compile time / runtime version mismatches, not by different classloaders. {quote} You might get IllegalAccessError if the class or method is eg. protected or default scope (not public or private) and the class trying to access it is in the same package but from a different classloader. We're having this problem when we try to use a more recent version of Typesafe Config, and I'm not sure why it's not using the user code classloader provided via Thread#setContextClassLoader(). > protobuf is not shaded properly > --- > > Key: FLINK-5690 > URL: https://issues.apache.org/jira/browse/FLINK-5690 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.1.4, 1.3.0 >Reporter: Andrey >Assignee: Robert Metzger > > Currently distributive contains com/google/protobuf package. Without proper > shading client code could fail with: > {code} > Caused by: java.lang.IllegalAccessError: tried to access method > com.google.protobuf. > {code} > Steps to reproduce: > * create job class "com.google.protobuf.TestClass" > * call com.google.protobuf.TextFormat.escapeText(String) method from this > class > * deploy job to flink cluster (usign web console for example) > * run job. In logs IllegalAccessError. > Issue in package protected method and different classloaders. TestClass > loaded by FlinkUserCodeClassLoader, but TextFormat class loaded by > sun.misc.Launcher$AppClassLoader -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889297#comment-15889297 ] Shannon Carey commented on FLINK-5929: -- [~aljoscha] as far as I am aware, the state does get cleared out by our Trigger. In Trigger#clear() we have: ctx.getPartitionedState(fireTimestampStateDescriptor).clear(); We could have done it in the Window Function instead, if we wanted to, given our hack. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886419#comment-15886419 ] Shannon Carey commented on FLINK-5929: -- If I understand correctly, I agree this would be useful. Currently we are working around this limitation in order to achieve communication between the Trigger (per-pane state) and the WindowFunction (per-operator state) by a hack within the WindowFunction that looks like this (we're not on 1.2 yet so we haven't looked at new ways to do this yet): {code} def apply(key: String, window: TimeWindow, input, out): Unit = { val fireTimestampState: ValueState[java.lang.Long] = getRuntimeContext.getState[java.lang.Long](fireTimestampStateDescriptor) if (fireTimestampState.isInstanceOf[MemValueState[String, TimeWindow, java.lang.Long]]) { fireTimestampState.asInstanceOf[MemValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window) } else if (fireTimestampState.isInstanceOf[RocksDBValueState[String, TimeWindow, java.lang.Long]]) { fireTimestampState.asInstanceOf[RocksDBValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window) } else if (fireTimestampState.isInstanceOf[FsValueState[String, TimeWindow, java.lang.Long]]) { fireTimestampState.asInstanceOf[FsValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window) } fireTimestampState.value() ... {code} > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378 ] Shannon Carey edited comment on FLINK-5929 at 2/27/17 7:40 PM: --- Just to clarify, you're referring to the state from AbstractRichFunction#getRuntimeContext.getState(), right? That's the state keyed by the combination of event key and window operator (that's what you mean by "global"). What you're suggesting is adding state for the individual window panes, right? was (Author: rehevkor5): Just to clarify, you're referring to the state from AbstractRichFunction#getRuntimeContext.getState(), right? That's the state keyed by the combination of event key and window operator. What you're suggesting is adding state for the individual window panes, right? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378 ] Shannon Carey edited comment on FLINK-5929 at 2/27/17 7:38 PM: --- Just to clarify, you're referring to the state from AbstractRichFunction#getRuntimeContext.getState(), right? That's the state keyed by the combination of event key and window operator. What you're suggesting is adding state for the individual window panes, right? was (Author: rehevkor5): Just to clarify, you're referring to the state from AbstractRichFunction#getRuntimeContext.getState(), right? That's the state keyed by the combination of event key and window operator. What you're suggesting is adding state for the individual panes within the window, right? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378 ] Shannon Carey commented on FLINK-5929: -- Just to clarify, you're referring to the state from AbstractRichFunction#getRuntimeContext.getState(), right? That's the state keyed by the combination of event key and window operator. What you're suggesting is adding state for the individual panes within the window, right? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5608) Cancel button not always visible
Shannon Carey created FLINK-5608: Summary: Cancel button not always visible Key: FLINK-5608 URL: https://issues.apache.org/jira/browse/FLINK-5608 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.1.4 Reporter: Shannon Carey Assignee: Shannon Carey Priority: Minor When the window is not wide enough, or when the job name is too long, the "Cancel" button in the Job view of the web UI is not visible because it is the first element that gets wrapped down and gets covered by the secondary navbar (the tabs). This causes us to often need to resize the browser wider than our monitor in order to use the cancel button. In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the content may wrap, especially if the content's horizontal width if not known & fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any unexpected change in height will result in overlap with the rest of the normal-flow content in the page. The Bootstrap docs explain this in their "Overflowing content" callout. I am submitting a PR which does not attempt to resolve all issues with the fixed navbar approach, but attempts to improve the situation by using less horizontal space and by altering the layout approach of the Cancel button. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5542) YARN client incorrectly uses local YARN config to check vcore capacity
Shannon Carey created FLINK-5542: Summary: YARN client incorrectly uses local YARN config to check vcore capacity Key: FLINK-5542 URL: https://issues.apache.org/jira/browse/FLINK-5542 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.1.4 Reporter: Shannon Carey See http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/1-1-4-on-YARN-vcores-change-td11016.html When using bin/yarn-session.sh, AbstractYarnClusterDescriptor line 271 in 1.1.4 is comparing the user's selected number of vcores to the vcores configured in the local node's YARN config (from YarnConfiguration eg. yarn-site.xml and yarn-default.xml). It incorrectly prevents Flink from launching even if there is sufficient vcore capacity on the cluster. That is not correct, because the application will not necessarily run on the local node. For example, if running the yarn-session.sh client from the AWS EMR master node, the vcore count there may be different from the vcore count on the core nodes where Flink will actually run. A reasonable way to fix this would probably be to reuse the logic from "yarn-session.sh -q" (FlinkYarnSessionCli line 550) which knows how to get vcore information from the real worker nodes. Alternatively, perhaps we could remove the check entirely and rely on YARN's Scheduler to determine whether sufficient resources exist. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5425) JobManager replaced by IP in metrics
[ https://issues.apache.org/jira/browse/FLINK-5425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15810263#comment-15810263 ] Shannon Carey commented on FLINK-5425: -- I am using the statsd reporter and the data thereby flows to Graphite. You can see that the filter character method of the statsd reporter does not filter ".": https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L199 So, yes, this could be fixed in the reporter, and I could make a PR with that change... but it would impact dots in every part of the name, as you mention. While that might make sense for people like me who are using the Graphite backend (though it would change how our job names appear in the metrics since those contain periods), I'm not sure it makes sense for people who use other backends. Given the uncertainty, perhaps it would be better to add a configuration parameter which allows the user to control what characters get filtered out? A regex perhaps? The reporter isn't really broken, it's just that the metric naming is inconsistent. To me, the simplest solution is to eliminate the difference behavior between identifying jobmanager by IP in the metrics vs. identifying taskmanager by hostname. This problem is definitely present in 1.1.3 (that's where I'm seeing it in my live systems), but you're right the link I put in the description was to the then-current master. I'm happy to submit a PR once the implementation approach has been decided... although I may need a little guidance about how to go about adjusting the TaskManagerLocation logic so JobManagerRunner can share it, if that's what gets decided. > JobManager replaced by IP in metrics > --- > > Key: FLINK-5425 > URL: https://issues.apache.org/jira/browse/FLINK-5425 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.3 >Reporter: Shannon Carey >Priority: Minor > > In metrics at the jobmanager level and below, the "" scope variable is > being replaced by the IP rather than the hostname. The taskmanager metrics, > meanwhile, use the host name. > You can see the job manager behavior at > https://github.com/apache/flink/blob/a1934255421b97eefd579183e9c7199c43ad1a2c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147 > compared to TaskManagerLocation#getHostname(). > The problem with this is mainly that due to the presence of "." (period) > characters in the IP address and thereby the metric name, the metric names > show up strangely in Graphite/Grafana, where "." is the metric group > separator. > If it's not possible to make jobmanager metrics use the hostname, I suggest > replacing "." with "-" in the section. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5425) JobManager replaced by IP in metrics
[ https://issues.apache.org/jira/browse/FLINK-5425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shannon Carey updated FLINK-5425: - Description: In metrics at the jobmanager level and below, the "" scope variable is being replaced by the IP rather than the hostname. The taskmanager metrics, meanwhile, use the host name. You can see the job manager behavior at https://github.com/apache/flink/blob/a1934255421b97eefd579183e9c7199c43ad1a2c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147 compared to TaskManagerLocation#getHostname(). The problem with this is mainly that due to the presence of "." (period) characters in the IP address and thereby the metric name, the metric names show up strangely in Graphite/Grafana, where "." is the metric group separator. If it's not possible to make jobmanager metrics use the hostname, I suggest replacing "." with "-" in the section. was: In metrics at the jobmanager level and below, the "" scope variable is being replaced by the IP rather than the hostname. The taskmanager metrics, meanwhile, use the host name. You can see the job manager behavior at https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147 compared to TaskManagerLocation#getHostname(). The problem with this is mainly that due to the presence of "." (period) characters in the IP address and thereby the metric name, the metric names show up strangely in Graphite/Grafana, where "." is the metric group separator. If it's not possible to make jobmanager metrics use the hostname, I suggest replacing "." with "-" in the section. > JobManager replaced by IP in metrics > --- > > Key: FLINK-5425 > URL: https://issues.apache.org/jira/browse/FLINK-5425 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.3 >Reporter: Shannon Carey >Priority: Minor > > In metrics at the jobmanager level and below, the "" scope variable is > being replaced by the IP rather than the hostname. The taskmanager metrics, > meanwhile, use the host name. > You can see the job manager behavior at > https://github.com/apache/flink/blob/a1934255421b97eefd579183e9c7199c43ad1a2c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147 > compared to TaskManagerLocation#getHostname(). > The problem with this is mainly that due to the presence of "." (period) > characters in the IP address and thereby the metric name, the metric names > show up strangely in Graphite/Grafana, where "." is the metric group > separator. > If it's not possible to make jobmanager metrics use the hostname, I suggest > replacing "." with "-" in the section. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5425) JobManager replaced by IP in metrics
Shannon Carey created FLINK-5425: Summary: JobManager replaced by IP in metrics Key: FLINK-5425 URL: https://issues.apache.org/jira/browse/FLINK-5425 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.1.3 Reporter: Shannon Carey Priority: Minor In metrics at the jobmanager level and below, the "" scope variable is being replaced by the IP rather than the hostname. The taskmanager metrics, meanwhile, use the host name. You can see the job manager behavior at https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147 compared to TaskManagerLocation#getHostname(). The problem with this is mainly that due to the presence of "." (period) characters in the IP address and thereby the metric name, the metric names show up strangely in Graphite/Grafana, where "." is the metric group separator. If it's not possible to make jobmanager metrics use the hostname, I suggest replacing "." with "-" in the section. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5322) Clean up yarn configuration documentation
[ https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15806496#comment-15806496 ] Shannon Carey edited comment on FLINK-5322 at 1/7/17 2:11 AM: -- Another important note: quoting strings doesn't work the way it should in YAML either. Looks to be due to GlobalConfiguration#loadYAMLResource(). This really should be fixed to use a compliant YAML parser. was (Author: rehevkor5): Another important note: quoting strings doesn't work the way it should in YAML either. > Clean up yarn configuration documentation > - > > Key: FLINK-5322 > URL: https://issues.apache.org/jira/browse/FLINK-5322 > Project: Flink > Issue Type: Improvement > Components: Documentation, YARN >Affects Versions: 1.2.0, 1.1.3 > Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3") >Reporter: Shannon Carey >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.2.0, 1.1.5 > > > The value I specified in flink-conf.yaml > {code} > yarn.taskmanager.env: > MY_ENV: test > {code} > is not available in {{System.getenv("MY_ENV")}} from the plan execution > (execution flow of main method) nor from within execution of a streaming > operator. > Interestingly, it does appear within the Flink JobManager Web UI under Job > Manager -> Configuration. > -- > The yarn section of the configuration page should be cleaned up a bit. The > "yarn.containers.vcores" parameter is listed twice, the example for > "yarn.application-master.env" is listed as a separate parameter and the > "yarn.taskmanager.env" description indirectly references another parameter > ("same as the above") which just isn't maintainable; instead it should be > described similarly as the application-master entry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5322) Clean up yarn configuration documentation
[ https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15806496#comment-15806496 ] Shannon Carey commented on FLINK-5322: -- Another important note: quoting strings doesn't work the way it should in YAML either. > Clean up yarn configuration documentation > - > > Key: FLINK-5322 > URL: https://issues.apache.org/jira/browse/FLINK-5322 > Project: Flink > Issue Type: Improvement > Components: Documentation, YARN >Affects Versions: 1.2.0, 1.1.3 > Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3") >Reporter: Shannon Carey >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.2.0, 1.1.5 > > > The value I specified in flink-conf.yaml > {code} > yarn.taskmanager.env: > MY_ENV: test > {code} > is not available in {{System.getenv("MY_ENV")}} from the plan execution > (execution flow of main method) nor from within execution of a streaming > operator. > Interestingly, it does appear within the Flink JobManager Web UI under Job > Manager -> Configuration. > -- > The yarn section of the configuration page should be cleaned up a bit. The > "yarn.containers.vcores" parameter is listed twice, the example for > "yarn.application-master.env" is listed as a separate parameter and the > "yarn.taskmanager.env" description indirectly references another parameter > ("same as the above") which just isn't maintainable; instead it should be > described similarly as the application-master entry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5424) Improve Restart Strategy Logging
Shannon Carey created FLINK-5424: Summary: Improve Restart Strategy Logging Key: FLINK-5424 URL: https://issues.apache.org/jira/browse/FLINK-5424 Project: Flink Issue Type: Improvement Components: Core Reporter: Shannon Carey Assignee: Shannon Carey Priority: Minor I'll be submitting a PR which includes some minor improvements to logging related to restart strategies. Specifically, I added a toString so that the log contains better info about failure-rate restart strategy, and I added an explanation in the log when the restart strategy is responsible for preventing job restart (currently, there's no indication that the restart strategy had anything to do with it). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv
[ https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15746008#comment-15746008 ] Shannon Carey commented on FLINK-5322: -- Yes, that worked! Thanks! It becomes available from within UDF (streaming operator) code. Perhaps this ticket can be adapted to just clarify the documentation to explain that the YAML isn't really interpreted the way people may expect/to provide an example? > yarn.taskmanager.env value does not appear in System.getenv > --- > > Key: FLINK-5322 > URL: https://issues.apache.org/jira/browse/FLINK-5322 > Project: Flink > Issue Type: Bug > Components: YARN > Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3") >Reporter: Shannon Carey >Priority: Trivial > Fix For: 1.1.3 > > > The value I specified in flink-conf.yaml > {code} > yarn.taskmanager.env: > MY_ENV: test > {code} > is not available in {{System.getenv("MY_ENV")}} from the plan execution > (execution flow of main method) nor from within execution of a streaming > operator. > Interestingly, it does appear within the Flink JobManager Web UI under Job > Manager -> Configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4091) flink-connector-cassandra has conflicting guava version
[ https://issues.apache.org/jira/browse/FLINK-4091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15634445#comment-15634445 ] Shannon Carey edited comment on FLINK-4091 at 12/12/16 9:44 PM: Flink's inclusion of altered Cassandra classes (though probably unavoidable) was causing a lot of problems for us when using a library with dependencies on cassandra-driver-core and cassandra-driver-mapping. When launching a Flink job locally from the IDE, the two different versions of Cassandra included on the classpath would cause runtime errors. Excluding the two dependencies with Maven seems to have fixed the issue, allowing us to run our Flink jobs from the IDE again. Just figured I'd mention it here in case it helps anyone else. However, we do have to be careful in the library not to use methods that return Guava classes (such as asyncPrepare) because they'll be written against the non-shaded Guava. Relocating the Guava package with the shade plugin works when using the jar, but not when running Flink jobs from within the IDE (IntelliJ doesn't have full integration with the shade plugin). was (Author: rehevkor5): Flink's inclusion of altered Cassandra classes (though probably unavoidable) was causing a lot of problems for us when using a library with dependencies on cassandra-driver-core and cassandra-driver-mapping. When launching a Flink job locally from the IDE, the two different versions of Cassandra classed on the classpath would cause runtime errors. Excluding the two dependencies with Maven seems to have fixed the issue, allowing us to run our Flink jobs from the IDE again. Just figured I'd mention it here in case it helps anyone else. > flink-connector-cassandra has conflicting guava version > --- > > Key: FLINK-4091 > URL: https://issues.apache.org/jira/browse/FLINK-4091 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 > Environment: MacOSX, 1.10-SNAPSHOT (head is > 1a6bab3ef76805685044cf4521e32315169f9033) >Reporter: Dominik Bruhn >Assignee: Chesnay Schepler > > The newly merged cassandra streaming connector has an issue with its guava > dependency. > The build-process for flink-connector-cassandra creates shaded JAR file which > contains the connector, the datastax cassandra driver plus in > org.apache.flink.shaded a shaded copy of guava. > The datastax cassandra driver calls into Futures.withFallback ([1]) which is > present in this guava version. This also works inside the > flink-connector-cassandra jar. > Now the actual build-process for Flink happens and builds another shaded JAR > and creates the flink-dist.jar. Inside this JAR, there is also a shaded > version of guava inside org.apache.flink.shaded. > Now the issue: The guava version which is in the flink-dist.jar is not > compatible and doesn't contain the Futures.withFallback which the datastax > driver is using. > This leads into the following issue: You can without any problems launch a > flink task which uses the casandra driver locally (so through the > mini-cluster) because that is never using the flink-dist.jar. > BUT: As soon as you are trying to start this job on a flink cluster (which > uses the flink-dist.jar), the job breaks with the following exception: > https://gist.github.com/theomega/5ab9b14ffb516b15814de28e499b040d > You can inspect this by opening the > flink-connector-cassandra_2.11-1.1-SNAPSHOT.jar and the > flink-dist_2.11-1.1-SNAPSHOT.jar in a java decompiler. > I don't know a good solution here: Perhaps it would be one solution to shade > the guava for the cassandra-driver somewhere else than at > org.apache.flink.shaded. > [1]: > https://google.github.io/guava/releases/19.0/api/docs/com/google/common/util/concurrent/Futures.html#withFallback(com.google.common.util.concurrent.ListenableFuture, > com.google.common.util.concurrent.FutureFallback, > java.util.concurrent.Executor) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv
[ https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shannon Carey updated FLINK-5322: - Description: The value I specified in flink-conf.yaml {code} yarn.taskmanager.env: MY_ENV: test {code} is not available in {{System.getenv("MY_ENV")}} from the plan execution (execution flow of main method) nor from within execution of a streaming operator. Interestingly, it does appear within the Flink JobManager Web UI under Job Manager -> Configuration. was: The value I specified in flink-conf.yaml {code} yarn.taskmanager.env: MY_ENV: test {code} is not available in {{System.getenv("MY_ENV")}} from the plan execution (execution flow of main method) nor from within execution of a streaming operator. > yarn.taskmanager.env value does not appear in System.getenv > --- > > Key: FLINK-5322 > URL: https://issues.apache.org/jira/browse/FLINK-5322 > Project: Flink > Issue Type: Bug > Components: YARN > Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3") >Reporter: Shannon Carey >Priority: Trivial > Fix For: 1.1.3 > > > The value I specified in flink-conf.yaml > {code} > yarn.taskmanager.env: > MY_ENV: test > {code} > is not available in {{System.getenv("MY_ENV")}} from the plan execution > (execution flow of main method) nor from within execution of a streaming > operator. > Interestingly, it does appear within the Flink JobManager Web UI under Job > Manager -> Configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv
Shannon Carey created FLINK-5322: Summary: yarn.taskmanager.env value does not appear in System.getenv Key: FLINK-5322 URL: https://issues.apache.org/jira/browse/FLINK-5322 Project: Flink Issue Type: Bug Components: YARN Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3") Reporter: Shannon Carey Priority: Trivial Fix For: 1.1.3 The value I specified in flink-conf.yaml {code} yarn.taskmanager.env: MY_ENV: test {code} is not available in {{System.getenv("MY_ENV")}} from the plan execution (execution flow of main method) nor from within execution of a streaming operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4091) flink-connector-cassandra has conflicting guava version
[ https://issues.apache.org/jira/browse/FLINK-4091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15634445#comment-15634445 ] Shannon Carey commented on FLINK-4091: -- Flink's inclusion of altered Cassandra classes (though probably unavoidable) was causing a lot of problems for us when using a library with dependencies on cassandra-driver-core and cassandra-driver-mapping. When launching a Flink job locally from the IDE, the two different versions of Cassandra classed on the classpath would cause runtime errors. Excluding the two dependencies with Maven seems to have fixed the issue, allowing us to run our Flink jobs from the IDE again. Just figured I'd mention it here in case it helps anyone else. > flink-connector-cassandra has conflicting guava version > --- > > Key: FLINK-4091 > URL: https://issues.apache.org/jira/browse/FLINK-4091 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 > Environment: MacOSX, 1.10-SNAPSHOT (head is > 1a6bab3ef76805685044cf4521e32315169f9033) >Reporter: Dominik Bruhn >Assignee: Chesnay Schepler > > The newly merged cassandra streaming connector has an issue with its guava > dependency. > The build-process for flink-connector-cassandra creates shaded JAR file which > contains the connector, the datastax cassandra driver plus in > org.apache.flink.shaded a shaded copy of guava. > The datastax cassandra driver calls into Futures.withFallback ([1]) which is > present in this guava version. This also works inside the > flink-connector-cassandra jar. > Now the actual build-process for Flink happens and builds another shaded JAR > and creates the flink-dist.jar. Inside this JAR, there is also a shaded > version of guava inside org.apache.flink.shaded. > Now the issue: The guava version which is in the flink-dist.jar is not > compatible and doesn't contain the Futures.withFallback which the datastax > driver is using. > This leads into the following issue: You can without any problems launch a > flink task which uses the casandra driver locally (so through the > mini-cluster) because that is never using the flink-dist.jar. > BUT: As soon as you are trying to start this job on a flink cluster (which > uses the flink-dist.jar), the job breaks with the following exception: > https://gist.github.com/theomega/5ab9b14ffb516b15814de28e499b040d > You can inspect this by opening the > flink-connector-cassandra_2.11-1.1-SNAPSHOT.jar and the > flink-dist_2.11-1.1-SNAPSHOT.jar in a java decompiler. > I don't know a good solution here: Perhaps it would be one solution to shade > the guava for the cassandra-driver somewhere else than at > org.apache.flink.shaded. > [1]: > https://google.github.io/guava/releases/19.0/api/docs/com/google/common/util/concurrent/Futures.html#withFallback(com.google.common.util.concurrent.ListenableFuture, > com.google.common.util.concurrent.FutureFallback, > java.util.concurrent.Executor) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()
[ https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569025#comment-15569025 ] Shannon Carey commented on FLINK-4803: -- Yes, that's right. cancel() blocks on close(), and therefore if close() misbehaves the thread is never interrupted and cancel() blocks forever. In the issue description, I suggested your option #2. I think you'll want #1 no matter what. However, #2 allows for at least one message and/or exception to be logged that tells the user what went wrong (why their job is taking a long time to cancel, or why it did not cancel gracefully). I'm not sure what your DataSink-specific option would look like. Maybe it is similar to my workaround, where I wrapped my HadoopOutputFormat in a subclass that calls super.close() from a separate thread with a timeout? That workaround is ok, but I had to expend a fair amount of effort to figure out what the problem was, and also there was nothing I could do but restart Flink in order to get my job to terminate (not a desirable solution). You'll want Flink to function smoothly regardless of what data sink the user chooses. > Job Cancel can hang forever waiting for OutputFormat.close() > > > Key: FLINK-4803 > URL: https://issues.apache.org/jira/browse/FLINK-4803 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.1 >Reporter: Shannon Carey > > If the Flink job uses a badly-behaved OutputFormat (in this example, a > HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() > method blocks forever, it is impossible to cancel the Flink job even though > the blocked thread would respond to an interrupt. The stack traces below show > the state of the important threads when a job is canceled and the > OutputFormat is blocking forever inside of close(). > I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on > `this.format.close()`. When the timeout is reached, the Task thread should be > interrupted. > {code} > "Canceler for DataSink > (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" > #6422 daemon prio=5 os_prio=0 tid=0x7fb7e42f nid=0x34f3 waiting for > monitor entry [0x7fb7be079000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) > - waiting to lock <0x0006bae5f788> (a java.lang.Object) > at > org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149) > at java.lang.Thread.run(Thread.java:745) > "DataSink > (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" > #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on > condition [0x7fb7bdf78000] >java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0006c5ab5e20> (a > java.util.concurrent.SynchronousQueue$TransferStack) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) > at > java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362) > at > java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156) > at > org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275) > at > org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133) > at > org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126) > at > org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) > - locked <0x0006bae5f788> (a java.lang.Object) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()
[ https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shannon Carey updated FLINK-4803: - Description: If the Flink job uses a badly-behaved OutputFormat (in this example, a HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() method blocks forever, it is impossible to cancel the Flink job even though the blocked thread would respond to an interrupt. The stack traces below show the state of the important threads when a job is canceled and the OutputFormat is blocking forever inside of close(). I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on `this.format.close()`. When the timeout is reached, the Task thread should be interrupted. {code} "Canceler for DataSink (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" #6422 daemon prio=5 os_prio=0 tid=0x7fb7e42f nid=0x34f3 waiting for monitor entry [0x7fb7be079000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) - waiting to lock <0x0006bae5f788> (a java.lang.Object) at org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149) at java.lang.Thread.run(Thread.java:745) "DataSink (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on condition [0x7fb7bdf78000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0006c5ab5e20> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362) at java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895) at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194) at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180) at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156) at org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275) at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133) at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126) at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) - locked <0x0006bae5f788> (a java.lang.Object) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) {code} was: If the Flink job uses a badly-behaved OutputFormat (in this example, a HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() method blocks forever, it is impossible to cancel the Flink job even though the blocked thread would respond to an interrupt. The stack traces below show the state of the important threads when a job is canceled and the OutputFormat is blocking forever inside of close(). I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on `this.format.close()`. When the timeout is reached, the Task thread should be interrupted. {code} java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) - waiting to lock <0x0006bae5f788> (a java.lang.Object) at org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149) at java.lang.Thread.run(Thread.java:745) "DataSink (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on condition [0x7fb7bdf78000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0006c5ab5e20> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) at
[jira] [Created] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()
Shannon Carey created FLINK-4803: Summary: Job Cancel can hang forever waiting for OutputFormat.close() Key: FLINK-4803 URL: https://issues.apache.org/jira/browse/FLINK-4803 Project: Flink Issue Type: Bug Affects Versions: 1.1.1 Reporter: Shannon Carey If the Flink job uses a badly-behaved OutputFormat (in this example, a HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() method blocks forever, it is impossible to cancel the Flink job even though the blocked thread would respond to an interrupt. The stack traces below show the state of the important threads when a job is canceled and the OutputFormat is blocking forever inside of close(). I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on `this.format.close()`. When the timeout is reached, the Task thread should be interrupted. {code} java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) - waiting to lock <0x0006bae5f788> (a java.lang.Object) at org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149) at java.lang.Thread.run(Thread.java:745) "DataSink (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on condition [0x7fb7bdf78000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0006c5ab5e20> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362) at java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895) at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194) at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180) at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156) at org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275) at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133) at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126) at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) - locked <0x0006bae5f788> (a java.lang.Object) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3190) Retry rate limits for DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shannon Carey updated FLINK-3190: - Fix Version/s: 1.1.0 > Retry rate limits for DataStream API > > > Key: FLINK-3190 > URL: https://issues.apache.org/jira/browse/FLINK-3190 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Michał Fijołek >Priority: Minor > Fix For: 1.1.0 > > > For a long running stream processing job, absolute numbers of retries don't > make much sense: The job will accumulate transient errors over time and will > die eventually when thresholds are exceeded. Rate limits are better suited in > this scenario: A job should only die, if it fails too often in a given time > frame. To better overcome transient errors, retry delays could be used, as > suggested in other issues. > Absolute numbers of retries can still make sense, if failing operators don't > make any progress at all. We can measure progress by OperatorState changes > and by observing output, as long as the operator in question is not a sink. > If operator state changes and/or operator produces output, we can assume it > makes progress. > As an example, let's say we configured a retry rate limit of 10 retries per > hour and a non-sink operator A. If the operator fails once every 10 minutes > and produces output between failures, it should not lead to job termination. > But if the operator fails 11 times in an hour or does not produce output > between 11 consecutive failures, job should be terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception
[ https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433839#comment-15433839 ] Shannon Carey commented on FLINK-4418: -- Thanks [~rmetzger]. Should I mark the issue as "resolved" since there is a PR available? I'm not sure what your JIRA workflow looks like. > ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if > InetAddress.getLocalHost throws exception > -- > > Key: FLINK-4418 > URL: https://issues.apache.org/jira/browse/FLINK-4418 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.1.0 >Reporter: Shannon Carey >Assignee: Shannon Carey > > When attempting to connect to a cluster with a ClusterClient, if the > machine's hostname is not resolvable to an IP, an exception is thrown > preventing success. > This is the case if, for example, the hostname is not present & mapped to a > local IP in /etc/hosts. > The exception is below. I suggest that findAddressUsingStrategy() should > catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and > return null, allowing alternative strategies to be attempted by > findConnectingAddress(). I will open a PR to this effect. Ideally this could > be included in both 1.2 and 1.1.2. > In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS > EC2 instance. > {code} > 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed > to retrieve the JobManager gateway. > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430) > 21:11:35 at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > 21:11:35 at > org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334) > 21:11:35 at > com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81) > 21:11:35 at > com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34) > 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager > address at /10.2.89.80:43126 > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428) > 21:11:35 ... 8 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: > ip-10-2-64-47: unknown error > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1505) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123) > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187) > 21:11:35 ... 10 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown > error > 21:11:35 at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) > 21:11:35 at > java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) > 21:11:35 at > java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1500) > 21:11:35 ... 13 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception
[ https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shannon Carey updated FLINK-4418: - Description: When attempting to connect to a cluster with a ClusterClient, if the machine's hostname is not resolvable to an IP, an exception is thrown preventing success. This is the case if, for example, the hostname is not present & mapped to a local IP in /etc/hosts. The exception is below. I suggest that findAddressUsingStrategy() should catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and return null, allowing alternative strategies to be attempted by findConnectingAddress(). I will open a PR to this effect. Ideally this could be included in both 1.2 and 1.1.2. In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS EC2 instance. {code} 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway. 21:11:35at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430) 21:11:35at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90) 21:11:35at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) 21:11:35at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) 21:11:35at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334) 21:11:35at com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81) 21:11:35at com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105) 21:11:35at com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69) 21:11:35at com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34) 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager address at /10.2.89.80:43126 21:11:35at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189) 21:11:35at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649) 21:11:35at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428) 21:11:35... 8 more 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: ip-10-2-64-47: unknown error 21:11:35at java.net.InetAddress.getLocalHost(InetAddress.java:1505) 21:11:35at org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232) 21:11:35at org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123) 21:11:35at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187) 21:11:35... 10 more 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown error 21:11:35at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) 21:11:35at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) 21:11:35at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) 21:11:35at java.net.InetAddress.getLocalHost(InetAddress.java:1500) 21:11:35... 13 more {code} was: When attempting to connect to a cluster with a ClusterClient, if the machine's hostname is not resolvable to an IP, an exception is thrown preventing success. This is the case if, for example, the hostname is not present & mapped to a local IP in /etc/hosts. The exception is below. I suggest that findAddressUsingStrategy() should catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and return null, allowing alternative strategies to be attempted by findConnectingAddress(). I will open a PR to this effect. Ideally this could be included in both 1.2 and 1.1.2. {code} 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway. 21:11:35at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430) 21:11:35at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90) 21:11:35at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) 21:11:35at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) 21:11:35at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334) 21:11:35at com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81) 21:11:35at com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105) 21:11:35at
[jira] [Created] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception
Shannon Carey created FLINK-4418: Summary: ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception Key: FLINK-4418 URL: https://issues.apache.org/jira/browse/FLINK-4418 Project: Flink Issue Type: Bug Components: Client Affects Versions: 1.1.0 Reporter: Shannon Carey When attempting to connect to a cluster with a ClusterClient, if the machine's hostname is not resolvable to an IP, an exception is thrown preventing success. This is the case if, for example, the hostname is not present & mapped to a local IP in /etc/hosts. The exception is below. I suggest that findAddressUsingStrategy() should catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and return null, allowing alternative strategies to be attempted by findConnectingAddress(). I will open a PR to this effect. Ideally this could be included in both 1.2 and 1.1.2. {code} 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway. 21:11:35at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430) 21:11:35at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90) 21:11:35at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) 21:11:35at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) 21:11:35at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334) 21:11:35at com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81) 21:11:35at com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105) 21:11:35at com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69) 21:11:35at com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34) 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager address at /10.2.89.80:43126 21:11:35at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189) 21:11:35at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649) 21:11:35at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428) 21:11:35... 8 more 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: ip-10-2-64-47: unknown error 21:11:35at java.net.InetAddress.getLocalHost(InetAddress.java:1505) 21:11:35at org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232) 21:11:35at org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123) 21:11:35at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187) 21:11:35... 10 more 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown error 21:11:35at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) 21:11:35at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) 21:11:35at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) 21:11:35at java.net.InetAddress.getLocalHost(InetAddress.java:1500) 21:11:35... 13 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4334) Shaded Hadoop1 jar not fully excluded in Quickstart
Shannon Carey created FLINK-4334: Summary: Shaded Hadoop1 jar not fully excluded in Quickstart Key: FLINK-4334 URL: https://issues.apache.org/jira/browse/FLINK-4334 Project: Flink Issue Type: Bug Components: Quickstarts Affects Versions: 1.0.3, 1.0.2, 1.0.1, 1.1.0 Reporter: Shannon Carey The Shaded Hadoop1 jar has artifactId flink-shaded-hadoop1_2.10 since Flink 1.0.0 (see https://github.com/apache/flink/commit/2c4e4d1ffaf4107fb802c90858184fc10af66837), but the quickstart POMs both refer to it as flink-shaded-hadoop1. If using "-Pbuild-jar", the problem is not encountered. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4069) Kafka Consumer should not initialize on construction
[ https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shannon Carey closed FLINK-4069. Resolution: Duplicate > Kafka Consumer should not initialize on construction > > > Key: FLINK-4069 > URL: https://issues.apache.org/jira/browse/FLINK-4069 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Shannon Carey > > The Kafka Consumer connector currently interacts over the network with Kafka > in order to get partition metadata when the class is constructed. Instead, it > should do that work when the job actually begins to run (for example, in > AbstractRichFunction#open() of FlinkKafkaConsumer0?). > The main weakness of broker querying in the constructor is that if there are > network problems, Flink might take a long time (eg. ~1hr) inside the > user-supplied main() method while it attempts to contact each broker and > perform retries. In general, setting up the Kafka partitions does not seem > strictly necessary as part of execution of main() in order to set up the job > plan/topology. > However, as Robert Metzger mentions, there are important concerns with how > Kafka partitions are handled: > "The main reason why we do the querying centrally is: > a) avoid overloading the brokers > b) send the same list of partitions (in the same order) to all parallel > consumers to do a fixed partition assignments (also across restarts). When we > do the querying in the open() method, we need to make sure that all > partitions are assigned, without duplicates (also after restarts in case of > failures)." > See also the mailing list discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4069) Kafka Consumer should not initialize on construction
[ https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330837#comment-15330837 ] Shannon Carey commented on FLINK-4069: -- Oh, I'm sorry! I looked for an existing issue but didn't notice that one. I will close this one as duplicate. > Kafka Consumer should not initialize on construction > > > Key: FLINK-4069 > URL: https://issues.apache.org/jira/browse/FLINK-4069 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Shannon Carey > > The Kafka Consumer connector currently interacts over the network with Kafka > in order to get partition metadata when the class is constructed. Instead, it > should do that work when the job actually begins to run (for example, in > AbstractRichFunction#open() of FlinkKafkaConsumer0?). > The main weakness of broker querying in the constructor is that if there are > network problems, Flink might take a long time (eg. ~1hr) inside the > user-supplied main() method while it attempts to contact each broker and > perform retries. In general, setting up the Kafka partitions does not seem > strictly necessary as part of execution of main() in order to set up the job > plan/topology. > However, as Robert Metzger mentions, there are important concerns with how > Kafka partitions are handled: > "The main reason why we do the querying centrally is: > a) avoid overloading the brokers > b) send the same list of partitions (in the same order) to all parallel > consumers to do a fixed partition assignments (also across restarts). When we > do the querying in the open() method, we need to make sure that all > partitions are assigned, without duplicates (also after restarts in case of > failures)." > See also the mailing list discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4069) Kafka Consumer should not initialize on construction
Shannon Carey created FLINK-4069: Summary: Kafka Consumer should not initialize on construction Key: FLINK-4069 URL: https://issues.apache.org/jira/browse/FLINK-4069 Project: Flink Issue Type: Improvement Components: Kafka Connector Affects Versions: 1.0.3 Reporter: Shannon Carey The Kafka Consumer connector currently interacts over the network with Kafka in order to get partition metadata when the class is constructed. Instead, it should do that work when the job actually begins to run (for example, in AbstractRichFunction#open() of FlinkKafkaConsumer0?). The main weakness of broker querying in the constructor is that if there are network problems, Flink might take a long time (eg. ~1hr) inside the user-supplied main() method while it attempts to contact each broker and perform retries. In general, setting up the Kafka partitions does not seem strictly necessary as part of execution of main() in order to set up the job plan/topology. However, as Robert Metzger mentions, there are important concerns with how Kafka partitions are handled: "The main reason why we do the querying centrally is: a) avoid overloading the brokers b) send the same list of partitions (in the same order) to all parallel consumers to do a fixed partition assignments (also across restarts). When we do the querying in the open() method, we need to make sure that all partitions are assigned, without duplicates (also after restarts in case of failures)." See also the mailing list discussion: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3711) Scala fold() example syntax incorrect
[ https://issues.apache.org/jira/browse/FLINK-3711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15229638#comment-15229638 ] Shannon Carey commented on FLINK-3711: -- I will have a Github PR for you on this momentarily. > Scala fold() example syntax incorrect > - > > Key: FLINK-3711 > URL: https://issues.apache.org/jira/browse/FLINK-3711 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0, 1.0.1 >Reporter: Shannon Carey >Priority: Minor > > Scala's KeyedStream#fold which accepts scala.Function2 is defined as a > partially appliable function. The documentation, however, is written as if it > is a non-partial function. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3711) Scala fold() example syntax incorrect
Shannon Carey created FLINK-3711: Summary: Scala fold() example syntax incorrect Key: FLINK-3711 URL: https://issues.apache.org/jira/browse/FLINK-3711 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.0.1, 1.0.0 Reporter: Shannon Carey Priority: Minor Scala's KeyedStream#fold which accepts scala.Function2 is defined as a partially appliable function. The documentation, however, is written as if it is a non-partial function. -- This message was sent by Atlassian JIRA (v6.3.4#6332)