[jira] [Updated] (FLINK-8311) Flink needs documentation for network access control
[ https://issues.apache.org/jira/browse/FLINK-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright updated FLINK-8311: Description: There is a need for better documentation on what connects to what over which ports in a Flink cluster to allow users to configure network access control rules. E.g. I was under the impression that in a ZK HA configuration the Job Managers were essentially independent and only coordinated via ZK. But starting multiple JMs in HA with the JM RPC port blocked between JMs shows that the second JM's Akka subsystem is trying to connect to the leading JM: {code} INFO akka.remote.transport.ProtocolStateActor - No response from remote for outbound association. Associate timed out after [2 ms]. WARN akka.remote.ReliableDeliverySupervisor- Association with remote system [akka.tcp://flink@10.210.210.127:6123] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for outbound association. Associate timed out after [2 ms].] WARN akka.remote.transport.netty.NettyTransport- Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /10.210.210.127:6123 {code} was: There is a need for better documentation on what connects to what over which ports in a Flink cluster to allow users to configure network access control rules. E.g. I was under the impression that in a ZK HA configuration the Job Managers were essentially independent and only coordinated via ZK. But starting multiple JMs in HA with the JM RPC port blocked between JMs shows that the second JM's Akka subsystem is trying to connect to the leading JM: INFO akka.remote.transport.ProtocolStateActor - No response from remote for outbound association. Associate timed out after [2 ms]. WARN akka.remote.ReliableDeliverySupervisor- Association with remote system [akka.tcp://flink@10.210.210.127:6123] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for outbound association. Associate timed out after [2 ms].] WARN akka.remote.transport.netty.NettyTransport- Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /10.210.210.127:6123 > Flink needs documentation for network access control > > > Key: FLINK-8311 > URL: https://issues.apache.org/jira/browse/FLINK-8311 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Elias Levy > > There is a need for better documentation on what connects to what over which > ports in a Flink cluster to allow users to configure network access control > rules. > E.g. I was under the impression that in a ZK HA configuration the Job > Managers were essentially independent and only coordinated via ZK. But > starting multiple JMs in HA with the JM RPC port blocked between JMs shows > that the second JM's Akka subsystem is trying to connect to the leading JM: > {code} > INFO akka.remote.transport.ProtocolStateActor - No > response from remote for outbound association. Associate timed out after > [2 ms]. > WARN akka.remote.ReliableDeliverySupervisor- > Association with remote system [akka.tcp://flink@10.210.210.127:6123] has > failed, address is now gated for [5000] ms. Reason: [Association failed with > [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote > for outbound association. Associate timed out after [2 ms].] > WARN akka.remote.transport.netty.NettyTransport- Remote > connection to [null] failed with > org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: > connection timed out: /10.210.210.127:6123 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8311) Flink needs documentation for network access control
Elias Levy created FLINK-8311: - Summary: Flink needs documentation for network access control Key: FLINK-8311 URL: https://issues.apache.org/jira/browse/FLINK-8311 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.4.0 Reporter: Elias Levy There is a need for better documentation on what connects to what over which ports in a Flink cluster to allow users to configure network access control rules. E.g. I was under the impression that in a ZK HA configuration the Job Managers were essentially independent and only coordinated via ZK. But starting multiple JMs in HA with the JM RPC port blocked between JMs shows that the second JM's Akka subsystem is trying to connect to the leading JM: INFO akka.remote.transport.ProtocolStateActor - No response from remote for outbound association. Associate timed out after [2 ms]. WARN akka.remote.ReliableDeliverySupervisor- Association with remote system [akka.tcp://flink@10.210.210.127:6123] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for outbound association. Associate timed out after [2 ms].] WARN akka.remote.transport.netty.NettyTransport- Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /10.210.210.127:6123 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8310) Flink 1.4 Column 'rowtime' not found in any table
[ https://issues.apache.org/jira/browse/FLINK-8310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske resolved FLINK-8310. -- Resolution: Not A Problem I've answered [your question on Stack Overflow|https://stackoverflow.com/questions/47947030/flink-1-4-column-rowtime-not-found-in-any-table/47947652#47947652]. Please reopen, if the issue turns out to be a bug. Thanks, Fabian > Flink 1.4 Column 'rowtime' not found in any table > - > > Key: FLINK-8310 > URL: https://issues.apache.org/jira/browse/FLINK-8310 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API & SQL >Affects Versions: 1.4.0 > Environment: Ubuntu > JDK 8 >Reporter: Xuan Nguyen > Attachments: KafkaSqlStream.java > > > After following the > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html#configure-a-rowtime-attribute > documentation. > I register the timestamp field, but I still get rowtime not found in any > table: > {code:java} > KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic > .forTopic("alerting") > // set Kafka consumer properties > .withKafkaProperties(getKafkaProperties()) > // set Table schema > .withSchema(TableSchema.builder() > .field("tenant", Types.STRING()) > .field("message", Types.STRING()) > .field("frequency", Types.LONG()) > .field("timestamp", Types.SQL_TIMESTAMP()).build()) > .failOnMissingField(true) > .withRowtimeAttribute( > // "timestamp" is rowtime attribute > "timestamp", > // value of "timestamp" is extracted from existing field > with same name > new ExistingField("timestamp"), > // values of "timestamp" are at most out-of-order by 30 > seconds > new > BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1))) > .build(); > //register the alerting topic as kafka > tEnv.registerTableSource("kafka", source); > Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " + > "FROM kafka " + > "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), > tenant, message"); > tEnv.toAppendStream(results, Row.class).print(); > {code} > I get the following error: > {code} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 64 to line 1, column 70: Column > 'rowtime' not found in any table at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) > at > org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) > at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) Caused by: > org.apache.calcite.runtime.CalciteContextException: From line 1, column 64 to > line 1, column 70: Column 'rowtime' not found in any table at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8310) Flink 1.4 Column 'rowtime' not found in any table
Xuan Nguyen created FLINK-8310: -- Summary: Flink 1.4 Column 'rowtime' not found in any table Key: FLINK-8310 URL: https://issues.apache.org/jira/browse/FLINK-8310 Project: Flink Issue Type: Bug Components: Kafka Connector, Table API & SQL Affects Versions: 1.4.0 Environment: Ubuntu JDK 8 Reporter: Xuan Nguyen Attachments: KafkaSqlStream.java After following the https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html#configure-a-rowtime-attribute documentation. I register the timestamp field, but I still get rowtime not found in any table: {code:java} KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic .forTopic("alerting") // set Kafka consumer properties .withKafkaProperties(getKafkaProperties()) // set Table schema .withSchema(TableSchema.builder() .field("tenant", Types.STRING()) .field("message", Types.STRING()) .field("frequency", Types.LONG()) .field("timestamp", Types.SQL_TIMESTAMP()).build()) .failOnMissingField(true) .withRowtimeAttribute( // "timestamp" is rowtime attribute "timestamp", // value of "timestamp" is extracted from existing field with same name new ExistingField("timestamp"), // values of "timestamp" are at most out-of-order by 30 seconds new BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1))) .build(); //register the alerting topic as kafka tEnv.registerTableSource("kafka", source); Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " + "FROM kafka " + "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message"); tEnv.toAppendStream(results, Row.class).print(); {code} I get the following error: {code} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8309) JVM sigsegv crash when enabling async checkpoints
[ https://issues.apache.org/jira/browse/FLINK-8309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Pullin closed FLINK-8309. --- Resolution: Resolved > JVM sigsegv crash when enabling async checkpoints > - > > Key: FLINK-8309 > URL: https://issues.apache.org/jira/browse/FLINK-8309 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 > Environment: macOS 10.13.2 & Ubuntu 16.04.03 using JVM 1.8.0_151. >Reporter: Kevin Pullin > Attachments: StreamingJob.scala > > > h4. Summary > I have a streaming job with async checkpointing enabled. The job is crashing > the JVM with a SIGSEGV error coinciding with checkpoint completion. > Workarounds are noted below. I thought this was worth documenting in case > someone runs into similar issues or if a fix is possible. > h4. Job Overview & Observations > The job itself stores a large quantity of `case class` objects in > `valueState`s contained within a `RichFilterFunction`. This data is used for > deduplicating events. > The crash stops by: > - moving the case class outside of the anonymous RichFilterFunction class. > - reducing the number of objects stored in the valueState. > - reducing the size of the objects stored in the valueState. > - disabling async snapshots. > I can provide additional crash data as needed (core dumps, error logs, etc). > The StateBackend implementation doesn't matter; the job fails using the > Memory, Fs, and RocksDb backends. > From what I understand anonymous classes should be avoided with checkpointing > as the name isn't stable, so that seems like the best route for me. > h4. Reproduction case > The attached a `StreamingJob.scala` file that contains a minimal repo case, > which closely aligns with my actual job configuration. Running it > consistently crashes the JVM upon completion of the first checkpoint. > My tests runs set only two JVM options => -Xms4g -Xmx4g > h4. Crash output > Here's a crash captured from Ubuntu: > {noformat} > [info] # > [info] # A fatal error has been detected by the Java Runtime Environment: > [info] # > [info] # SIGSEGV (0xb) at pc=0x7fd192b92c1c, pid=7191, > tid=0x7fd0873f3700 > [info] # > [info] # JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build > 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12) > [info] # Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 > compressed oops) > [info] # Problematic frame: > [info] # C [libzip.so+0x5c1c] > [info] # > [info] # Core dump written. Default location: /home/ubuntu/flink-project/core > or core.7191 > [info] # > [info] # An error report file with more information is saved as: > [info] # /home/XXX/flink-project/hs_err_pid7191.log > [info] Compiled method (nm) 71547 81 n 0 > java.util.zip.ZipFile::getEntry (native) > [info] total in heap [0x7fd17d12e290,0x7fd17d12e600] = 880 > [info] relocation [0x7fd17d12e3b8,0x7fd17d12e400] = 72 > [info] main code [0x7fd17d12e400,0x7fd17d12e600] = 512 > [info] # > [info] # If you would like to submit a bug report, please visit: > [info] # http://bugreport.java.com/bugreport/crash.jsp > [info] # > {noformat} > And one from macOS: > {noformat} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x000105264c48, pid=30848, tid=0x3403 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_151-b12) (build > 1.8.0_151-b12) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.151-b12 mixed mode bsd-amd64 > compressed oops) > # Problematic frame: > # V [libjvm.dylib+0x464c48] > # > # Failed to write core dump. Core dumps have been disabled. To enable core > dumping, try "ulimit -c unlimited" before starting Java again > # > # An error report file with more information is saved as: > # /Users/XXX/src/etc_flink_mwx/hs_err_pid30848.log > [thread 30211 also had an error] > # > # If you would like to submit a bug report, please visit: > # http://bugreport.java.com/bugreport/crash.jsp > # The crash happened outside the Java Virtual Machine in native code. > # See problematic frame for where to report the bug. > # > Process finished with exit code 134 (interrupted by signal 6: SIGABRT) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8309) JVM sigsegv crash when enabling async checkpoints
[ https://issues.apache.org/jira/browse/FLINK-8309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301856#comment-16301856 ] Kevin Pullin commented on FLINK-8309: - Thanks Stefan. I had run across that, but didn't try running with JDK 9 as I saw a Flink JIRA issue still open for JDK 9 compatibility. I did a test run w/ JDK 9 and the problem is no longer occuring. For JDK 8 there is a suggestion to use the `-Dsun.zip.disableMemoryMapping=true` flag. That isn't helping so it's unclear to me if that particular fix is what's helping here or some other JDK 9 change. In any case I'll close this issue! > JVM sigsegv crash when enabling async checkpoints > - > > Key: FLINK-8309 > URL: https://issues.apache.org/jira/browse/FLINK-8309 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 > Environment: macOS 10.13.2 & Ubuntu 16.04.03 using JVM 1.8.0_151. >Reporter: Kevin Pullin > Attachments: StreamingJob.scala > > > h4. Summary > I have a streaming job with async checkpointing enabled. The job is crashing > the JVM with a SIGSEGV error coinciding with checkpoint completion. > Workarounds are noted below. I thought this was worth documenting in case > someone runs into similar issues or if a fix is possible. > h4. Job Overview & Observations > The job itself stores a large quantity of `case class` objects in > `valueState`s contained within a `RichFilterFunction`. This data is used for > deduplicating events. > The crash stops by: > - moving the case class outside of the anonymous RichFilterFunction class. > - reducing the number of objects stored in the valueState. > - reducing the size of the objects stored in the valueState. > - disabling async snapshots. > I can provide additional crash data as needed (core dumps, error logs, etc). > The StateBackend implementation doesn't matter; the job fails using the > Memory, Fs, and RocksDb backends. > From what I understand anonymous classes should be avoided with checkpointing > as the name isn't stable, so that seems like the best route for me. > h4. Reproduction case > The attached a `StreamingJob.scala` file that contains a minimal repo case, > which closely aligns with my actual job configuration. Running it > consistently crashes the JVM upon completion of the first checkpoint. > My tests runs set only two JVM options => -Xms4g -Xmx4g > h4. Crash output > Here's a crash captured from Ubuntu: > {noformat} > [info] # > [info] # A fatal error has been detected by the Java Runtime Environment: > [info] # > [info] # SIGSEGV (0xb) at pc=0x7fd192b92c1c, pid=7191, > tid=0x7fd0873f3700 > [info] # > [info] # JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build > 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12) > [info] # Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 > compressed oops) > [info] # Problematic frame: > [info] # C [libzip.so+0x5c1c] > [info] # > [info] # Core dump written. Default location: /home/ubuntu/flink-project/core > or core.7191 > [info] # > [info] # An error report file with more information is saved as: > [info] # /home/XXX/flink-project/hs_err_pid7191.log > [info] Compiled method (nm) 71547 81 n 0 > java.util.zip.ZipFile::getEntry (native) > [info] total in heap [0x7fd17d12e290,0x7fd17d12e600] = 880 > [info] relocation [0x7fd17d12e3b8,0x7fd17d12e400] = 72 > [info] main code [0x7fd17d12e400,0x7fd17d12e600] = 512 > [info] # > [info] # If you would like to submit a bug report, please visit: > [info] # http://bugreport.java.com/bugreport/crash.jsp > [info] # > {noformat} > And one from macOS: > {noformat} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x000105264c48, pid=30848, tid=0x3403 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_151-b12) (build > 1.8.0_151-b12) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.151-b12 mixed mode bsd-amd64 > compressed oops) > # Problematic frame: > # V [libjvm.dylib+0x464c48] > # > # Failed to write core dump. Core dumps have been disabled. To enable core > dumping, try "ulimit -c unlimited" before starting Java again > # > # An error report file with more information is saved as: > # /Users/XXX/src/etc_flink_mwx/hs_err_pid30848.log > [thread 30211 also had an error] > # > # If you would like to submit a bug report, please visit: > # http://bugreport.java.com/bugreport/crash.jsp > # The crash happened outside the Java Virtual Machine in native code. > # See problematic frame for where to report the bug. > # > Process finished with exit code 134 (interrupted by signal 6: SIGABRT) > {noformat} -- This message was sent by Atlassian
[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301835#comment-16301835 ] ASF GitHub Bot commented on FLINK-8289: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/5190#discussion_r158545462 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -155,6 +156,7 @@ protected void runCluster(Configuration configuration) throws Exception { // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); + configuration.setString(RestOptions.REST_ADDRESS, commonRpcService.getAddress()); --- End diff -- This appears to be configuring the REST server's bind address, using the server address obtained from `AkkaRpcService`. I don't understand the rationale, could you explain? > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8222) Update Scala version
[ https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-8222. - Resolution: Implemented master: 8987de3b241d23bbcc6ca5640e3cb77972a60be4 > Update Scala version > > > Key: FLINK-8222 > URL: https://issues.apache.org/jira/browse/FLINK-8222 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.5.0 > > > Update Scala to version {{2.11.12}}. I don't believe this affects the Flink > distribution but rather anyone who is compiling Flink or a > Flink-quickstart-derived program on a shared system. > "A privilege escalation vulnerability (CVE-2017-15288) has been identified in > the Scala compilation daemon." > https://www.scala-lang.org/news/security-update-nov17.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8223) Update Hadoop versions
[ https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-8223. - Resolution: Fixed master: d3cd51a3f9fbb3ffbe6d23a57ff3884733eb47fa > Update Hadoop versions > -- > > Key: FLINK-8223 > URL: https://issues.apache.org/jira/browse/FLINK-8223 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Update 2.7.3 to 2.7.5 and 2.8.0 to 2.8.3. See > http://hadoop.apache.org/releases.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/5190#discussion_r158545462 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -155,6 +156,7 @@ protected void runCluster(Configuration configuration) throws Exception { // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); + configuration.setString(RestOptions.REST_ADDRESS, commonRpcService.getAddress()); --- End diff -- This appears to be configuring the REST server's bind address, using the server address obtained from `AkkaRpcService`. I don't understand the rationale, could you explain? ---
[jira] [Updated] (FLINK-8223) Update Hadoop versions
[ https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-8223: -- Description: Update 2.7.3 to 2.7.5 and 2.8.0 to 2.8.3. See http://hadoop.apache.org/releases.html (was: Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See http://hadoop.apache.org/releases.html) > Update Hadoop versions > -- > > Key: FLINK-8223 > URL: https://issues.apache.org/jira/browse/FLINK-8223 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Update 2.7.3 to 2.7.5 and 2.8.0 to 2.8.3. See > http://hadoop.apache.org/releases.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8223) Update Hadoop versions
[ https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-8223: -- Fix Version/s: 1.5.0 > Update Hadoop versions > -- > > Key: FLINK-8223 > URL: https://issues.apache.org/jira/browse/FLINK-8223 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See > http://hadoop.apache.org/releases.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8222) Update Scala version
[ https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-8222: -- Fix Version/s: 1.5.0 > Update Scala version > > > Key: FLINK-8222 > URL: https://issues.apache.org/jira/browse/FLINK-8222 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.5.0 > > > Update Scala to version {{2.11.12}}. I don't believe this affects the Flink > distribution but rather anyone who is compiling Flink or a > Flink-quickstart-derived program on a shared system. > "A privilege escalation vulnerability (CVE-2017-15288) has been identified in > the Scala compilation daemon." > https://www.scala-lang.org/news/security-update-nov17.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8223) Update Hadoop versions
[ https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301830#comment-16301830 ] ASF GitHub Bot commented on FLINK-8223: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5137 > Update Hadoop versions > -- > > Key: FLINK-8223 > URL: https://issues.apache.org/jira/browse/FLINK-8223 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > > Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See > http://hadoop.apache.org/releases.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301831#comment-16301831 ] ASF GitHub Bot commented on FLINK-8289: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5190 The configuration option `REST_ADDRESS` seems destined to cause pain. Surveying the code, it seems to variously represent a bind address, a server address, and an advertised address. We should rename and clarify ASAP. > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301828#comment-16301828 ] ASF GitHub Bot commented on FLINK-5506: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5126 > Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException > - > > Key: FLINK-5506 > URL: https://issues.apache.org/jira/browse/FLINK-5506 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.1.4, 1.3.2, 1.4.1 >Reporter: Miguel E. Coimbra >Assignee: Greg Hogan > Labels: easyfix, newbie > Fix For: 1.5.0, 1.4.1 > > Original Estimate: 2h > Remaining Estimate: 2h > > Reporting this here as per Vasia's advice. > I am having the following problem while trying out the > org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API > (Java). > Specs: JDK 1.8.0_102 x64 > Apache Flink: 1.1.4 > Suppose I have a very small (I tried an example with 38 vertices as well) > dataset stored in a tab-separated file 3-vertex.tsv: > {code} > #id1 id2 score > 010 > 020 > 030 > {code} > This is just a central vertex with 3 neighbors (disconnected between > themselves). > I am loading the dataset and executing the algorithm with the following code: > {code} > // Load the data from the .tsv file. > final DataSet> edgeTuples = > env.readCsvFile(inputPath) > .fieldDelimiter("\t") // node IDs are separated by spaces > .ignoreComments("#") // comments start with "%" > .types(Long.class, Long.class, Double.class); > // Generate a graph and add reverse edges (undirected). > final Graph graph = Graph.fromTupleDataSet(edgeTuples, > new MapFunction() { > private static final long serialVersionUID = 8713516577419451509L; > public Long map(Long value) { > return value; > } > }, > env).getUndirected(); > // CommunityDetection parameters. > final double hopAttenuationDelta = 0.5d; > final int iterationCount = 10; > // Prepare and trigger the execution. > DataSet> vs = graph.run(new > org.apache.flink.graph.library.CommunityDetection(iterationCount, > hopAttenuationDelta)).getVertices(); > vs.print(); > {code} > Running this code throws the following exception (check the bold line): > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NullPointerException > at > org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158) > at > org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > at > org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > at > org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) > at java.lang.Thread.run(Thread.java:745) > {code} > After a further look, I set a breakpoint (Eclipse IDE debugging) at the line > in bold: > org.apache.flink.graph.library.CommunityDetection.java (sour
[jira] [Commented] (FLINK-8222) Update Scala version
[ https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301829#comment-16301829 ] ASF GitHub Bot commented on FLINK-8222: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5136 > Update Scala version > > > Key: FLINK-8222 > URL: https://issues.apache.org/jira/browse/FLINK-8222 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Update Scala to version {{2.11.12}}. I don't believe this affects the Flink > distribution but rather anyone who is compiling Flink or a > Flink-quickstart-derived program on a shared system. > "A privilege escalation vulnerability (CVE-2017-15288) has been identified in > the Scala compilation daemon." > https://www.scala-lang.org/news/security-update-nov17.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5190: [FLINK-8289] [runtime] set the rest.address to the host o...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5190 The configuration option `REST_ADDRESS` seems destined to cause pain. Surveying the code, it seems to variously represent a bind address, a server address, and an advertised address. We should rename and clarify ASAP. ---
[GitHub] flink pull request #5195: [hotfix] [build] Always include Kafka 0.11 connect...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5195 ---
[GitHub] flink pull request #5136: [FLINK-8222] [build] Update Scala version
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5136 ---
[GitHub] flink pull request #5137: [FLINK-8223] [build] Update Hadoop versions
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5137 ---
[GitHub] flink pull request #5126: [FLINK-5506] [gelly] Fix CommunityDetection NullPo...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5126 ---
[GitHub] flink pull request #5199: [hotfix] [javadoc] Fix typos in MemorySegment clas...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5199 ---
[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301817#comment-16301817 ] Eron Wright commented on FLINK-8289: - I guess we should be clear about who is being advertised to. For example, if the value is intended for use by the client, we'd want to give the proxy address. If the value is intended for use by the proxy (as the upstream address), we'd want to give the server address. > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7860) Support YARN proxy user in Flink (impersonation)
[ https://issues.apache.org/jira/browse/FLINK-7860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301806#comment-16301806 ] Eron Wright edited comment on FLINK-7860 at 12/22/17 6:54 PM: --- Regarding how a proxy user would be configured, the goal is to set the login user to a proxy user UGI that wraps the kerberos (real) UGI. The real UGI must continue to be initialized using a keytab as normal. Rather than introduce new config settings, Flink could simply make use of Hadoop's built-in `HADOOP_PROXY_USER` environment variable. I suggest that Flink simply propagate the `HADOOP_PROXY_USER` variable to the AM/TM. Then, in `org.apache.flink.runtime.security.modules.HadoopModule`, wrap the `loginUser` with a proxy-user UGI when `HADOOP_PROXY_USER` is set and then call `UGI.setLoginUser`. This need only be done in the `loginUserFromKeytab` scenario, not in the `loginUserFromSubject` scenario since `loginUserFromSubject` already does exactly that. See HADOOP-8561. was (Author: eronwright): Regarding how a proxy user would be configured, the goal is to set the login user to a proxy user UGI that wraps the kerberos (real) UGI. The real UGI must continue to be initialized using a keytab as normal. Rather than introduce new config settings, Flink could simply make use of Hadoop's built-in `HADOOP_PROXY_USER` environment variable. I suggest that Flink simply propagate the `HADOOP_PROXY_USER` variable to the AM/TM. Then, in `org.apache.flink.runtime.security.modules.HadoopModule`, wrap the `loginUser` with a proxy-user UGI when `HADOOP_PROXY_USER` is set and then call `UGI.setLoginUser`. This need only be done in the `loginUserFromKeytab` scenario, not in the `loginUserFromSubject` scenario since `loginUserFromSubject` already does exactly that. > Support YARN proxy user in Flink (impersonation) > > > Key: FLINK-7860 > URL: https://issues.apache.org/jira/browse/FLINK-7860 > Project: Flink > Issue Type: New Feature > Components: YARN >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7860) Support YARN proxy user in Flink (impersonation)
[ https://issues.apache.org/jira/browse/FLINK-7860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301806#comment-16301806 ] Eron Wright commented on FLINK-7860: - Regarding how a proxy user would be configured, the goal is to set the login user to a proxy user UGI that wraps the kerberos (real) UGI. The real UGI must continue to be initialized using a keytab as normal. Rather than introduce new config settings, Flink could simply make use of Hadoop's built-in `HADOOP_PROXY_USER` environment variable. I suggest that Flink simply propagate the `HADOOP_PROXY_USER` variable to the AM/TM. Then, in `org.apache.flink.runtime.security.modules.HadoopModule`, wrap the `loginUser` with a proxy-user UGI when `HADOOP_PROXY_USER` is set and then call `UGI.setLoginUser`. This need only be done in the `loginUserFromKeytab` scenario, not in the `loginUserFromSubject` scenario since `loginUserFromSubject` already does exactly that. > Support YARN proxy user in Flink (impersonation) > > > Key: FLINK-7860 > URL: https://issues.apache.org/jira/browse/FLINK-7860 > Project: Flink > Issue Type: New Feature > Components: YARN >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort
[ https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301773#comment-16301773 ] ASF GitHub Bot commented on FLINK-8037: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/5205 [FLINK-8037] Fix integer multiplication or shift implicitly cast to long ## What is the purpose of the change Fixes potential overflow flagged by the IntelliJ inspection "Integer multiplication or shift implicitly cast to long". ## Brief change log - mark integer literal as long - cast multiplied integer to long ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no)** - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 8037_fix_integer_multiplication_or_shift_implicitly_cast_to_long Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5205.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5205 commit ec2f80c5e890dc52b715eeca719c911aaa75fb82 Author: Greg Hogan Date: 2017-12-21T17:45:54Z [FLINK-8037] Fix integer multiplication or shift implicitly cast to long > Missing cast in integer arithmetic in > TransactionalIdsGenerator#generateIdsToAbort > -- > > Key: FLINK-8037 > URL: https://issues.apache.org/jira/browse/FLINK-8037 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Greg Hogan >Priority: Minor > > {code} > public Set generateIdsToAbort() { > Set idsToAbort = new HashSet<>(); > for (int i = 0; i < safeScaleDownFactor; i++) { > idsToAbort.addAll(generateIdsToUse(i * poolSize * > totalNumberOfSubtasks)); > {code} > The operands are integers where generateIdsToUse() expects long parameter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5205: [FLINK-8037] Fix integer multiplication or shift i...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/5205 [FLINK-8037] Fix integer multiplication or shift implicitly cast to long ## What is the purpose of the change Fixes potential overflow flagged by the IntelliJ inspection "Integer multiplication or shift implicitly cast to long". ## Brief change log - mark integer literal as long - cast multiplied integer to long ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no)** - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 8037_fix_integer_multiplication_or_shift_implicitly_cast_to_long Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5205.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5205 commit ec2f80c5e890dc52b715eeca719c911aaa75fb82 Author: Greg Hogan Date: 2017-12-21T17:45:54Z [FLINK-8037] Fix integer multiplication or shift implicitly cast to long ---
[jira] [Commented] (FLINK-8309) JVM sigsegv crash when enabling async checkpoints
[ https://issues.apache.org/jira/browse/FLINK-8309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301756#comment-16301756 ] Stefan Richter commented on FLINK-8309: --- Looks like this is very likely related to this Java problem: https://bugs.java.com/view_bug.do?bug_id=8145260 . Maybe you can confirm and close the issue in that case, because I think there is not much we can do about this? > JVM sigsegv crash when enabling async checkpoints > - > > Key: FLINK-8309 > URL: https://issues.apache.org/jira/browse/FLINK-8309 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 > Environment: macOS 10.13.2 & Ubuntu 16.04.03 using JVM 1.8.0_151. >Reporter: Kevin Pullin > Attachments: StreamingJob.scala > > > h4. Summary > I have a streaming job with async checkpointing enabled. The job is crashing > the JVM with a SIGSEGV error coinciding with checkpoint completion. > Workarounds are noted below. I thought this was worth documenting in case > someone runs into similar issues or if a fix is possible. > h4. Job Overview & Observations > The job itself stores a large quantity of `case class` objects in > `valueState`s contained within a `RichFilterFunction`. This data is used for > deduplicating events. > The crash stops by: > - moving the case class outside of the anonymous RichFilterFunction class. > - reducing the number of objects stored in the valueState. > - reducing the size of the objects stored in the valueState. > - disabling async snapshots. > I can provide additional crash data as needed (core dumps, error logs, etc). > The StateBackend implementation doesn't matter; the job fails using the > Memory, Fs, and RocksDb backends. > From what I understand anonymous classes should be avoided with checkpointing > as the name isn't stable, so that seems like the best route for me. > h4. Reproduction case > The attached a `StreamingJob.scala` file that contains a minimal repo case, > which closely aligns with my actual job configuration. Running it > consistently crashes the JVM upon completion of the first checkpoint. > My tests runs set only two JVM options => -Xms4g -Xmx4g > h4. Crash output > Here's a crash captured from Ubuntu: > {noformat} > [info] # > [info] # A fatal error has been detected by the Java Runtime Environment: > [info] # > [info] # SIGSEGV (0xb) at pc=0x7fd192b92c1c, pid=7191, > tid=0x7fd0873f3700 > [info] # > [info] # JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build > 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12) > [info] # Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 > compressed oops) > [info] # Problematic frame: > [info] # C [libzip.so+0x5c1c] > [info] # > [info] # Core dump written. Default location: /home/ubuntu/flink-project/core > or core.7191 > [info] # > [info] # An error report file with more information is saved as: > [info] # /home/XXX/flink-project/hs_err_pid7191.log > [info] Compiled method (nm) 71547 81 n 0 > java.util.zip.ZipFile::getEntry (native) > [info] total in heap [0x7fd17d12e290,0x7fd17d12e600] = 880 > [info] relocation [0x7fd17d12e3b8,0x7fd17d12e400] = 72 > [info] main code [0x7fd17d12e400,0x7fd17d12e600] = 512 > [info] # > [info] # If you would like to submit a bug report, please visit: > [info] # http://bugreport.java.com/bugreport/crash.jsp > [info] # > {noformat} > And one from macOS: > {noformat} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x000105264c48, pid=30848, tid=0x3403 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_151-b12) (build > 1.8.0_151-b12) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.151-b12 mixed mode bsd-amd64 > compressed oops) > # Problematic frame: > # V [libjvm.dylib+0x464c48] > # > # Failed to write core dump. Core dumps have been disabled. To enable core > dumping, try "ulimit -c unlimited" before starting Java again > # > # An error report file with more information is saved as: > # /Users/XXX/src/etc_flink_mwx/hs_err_pid30848.log > [thread 30211 also had an error] > # > # If you would like to submit a bug report, please visit: > # http://bugreport.java.com/bugreport/crash.jsp > # The crash happened outside the Java Virtual Machine in native code. > # See problematic frame for where to report the bug. > # > Process finished with exit code 134 (interrupted by signal 6: SIGABRT) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8309) JVM sigsegv crash when enabling async checkpoints
Kevin Pullin created FLINK-8309: --- Summary: JVM sigsegv crash when enabling async checkpoints Key: FLINK-8309 URL: https://issues.apache.org/jira/browse/FLINK-8309 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.3.2, 1.4.0 Environment: macOS 10.13.2 & Ubuntu 16.04.03 using JVM 1.8.0_151. Reporter: Kevin Pullin Attachments: StreamingJob.scala h4. Summary I have a streaming job with async checkpointing enabled. The job is crashing the JVM with a SIGSEGV error coinciding with checkpoint completion. Workarounds are noted below. I thought this was worth documenting in case someone runs into similar issues or if a fix is possible. h4. Job Overview & Observations The job itself stores a large quantity of `case class` objects in `valueState`s contained within a `RichFilterFunction`. This data is used for deduplicating events. The crash stops by: - moving the case class outside of the anonymous RichFilterFunction class. - reducing the number of objects stored in the valueState. - reducing the size of the objects stored in the valueState. - disabling async snapshots. I can provide additional crash data as needed (core dumps, error logs, etc). The StateBackend implementation doesn't matter; the job fails using the Memory, Fs, and RocksDb backends. >From what I understand anonymous classes should be avoided with checkpointing >as the name isn't stable, so that seems like the best route for me. h4. Reproduction case The attached a `StreamingJob.scala` file that contains a minimal repo case, which closely aligns with my actual job configuration. Running it consistently crashes the JVM upon completion of the first checkpoint. My tests runs set only two JVM options => -Xms4g -Xmx4g h4. Crash output Here's a crash captured from Ubuntu: {noformat} [info] # [info] # A fatal error has been detected by the Java Runtime Environment: [info] # [info] # SIGSEGV (0xb) at pc=0x7fd192b92c1c, pid=7191, tid=0x7fd0873f3700 [info] # [info] # JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12) [info] # Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 compressed oops) [info] # Problematic frame: [info] # C [libzip.so+0x5c1c] [info] # [info] # Core dump written. Default location: /home/ubuntu/flink-project/core or core.7191 [info] # [info] # An error report file with more information is saved as: [info] # /home/XXX/flink-project/hs_err_pid7191.log [info] Compiled method (nm) 71547 81 n 0 java.util.zip.ZipFile::getEntry (native) [info] total in heap [0x7fd17d12e290,0x7fd17d12e600] = 880 [info] relocation [0x7fd17d12e3b8,0x7fd17d12e400] = 72 [info] main code [0x7fd17d12e400,0x7fd17d12e600] = 512 [info] # [info] # If you would like to submit a bug report, please visit: [info] # http://bugreport.java.com/bugreport/crash.jsp [info] # {noformat} And one from macOS: {noformat} # # A fatal error has been detected by the Java Runtime Environment: # # SIGSEGV (0xb) at pc=0x000105264c48, pid=30848, tid=0x3403 # # JRE version: Java(TM) SE Runtime Environment (8.0_151-b12) (build 1.8.0_151-b12) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.151-b12 mixed mode bsd-amd64 compressed oops) # Problematic frame: # V [libjvm.dylib+0x464c48] # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again # # An error report file with more information is saved as: # /Users/XXX/src/etc_flink_mwx/hs_err_pid30848.log [thread 30211 also had an error] # # If you would like to submit a bug report, please visit: # http://bugreport.java.com/bugreport/crash.jsp # The crash happened outside the Java Virtual Machine in native code. # See problematic frame for where to report the bug. # Process finished with exit code 134 (interrupted by signal 6: SIGABRT) {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301529#comment-16301529 ] ASF GitHub Bot commented on FLINK-8283: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5201 The following 10 local Travis runs suggests that the stalling tests on the `connectors` build no longer remains after this change: - https://travis-ci.org/tzulitai/flink/builds/319936358 - https://travis-ci.org/tzulitai/flink/builds/319936375 - https://travis-ci.org/tzulitai/flink/builds/319936388 - https://travis-ci.org/tzulitai/flink/builds/319936405 - https://travis-ci.org/tzulitai/flink/builds/319936419 - https://travis-ci.org/tzulitai/flink/builds/319936440 - https://travis-ci.org/tzulitai/flink/builds/319936454 - https://travis-ci.org/tzulitai/flink/builds/319936469 - https://travis-ci.org/tzulitai/flink/builds/319936480 - https://travis-ci.org/tzulitai/flink/builds/319936556 > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915
[GitHub] flink issue #5201: [FLINK-8283] [kafka] Stabalize FlinkKafkaConsumerBaseTest...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5201 The following 10 local Travis runs suggests that the stalling tests on the `connectors` build no longer remains after this change: - https://travis-ci.org/tzulitai/flink/builds/319936358 - https://travis-ci.org/tzulitai/flink/builds/319936375 - https://travis-ci.org/tzulitai/flink/builds/319936388 - https://travis-ci.org/tzulitai/flink/builds/319936405 - https://travis-ci.org/tzulitai/flink/builds/319936419 - https://travis-ci.org/tzulitai/flink/builds/319936440 - https://travis-ci.org/tzulitai/flink/builds/319936454 - https://travis-ci.org/tzulitai/flink/builds/319936469 - https://travis-ci.org/tzulitai/flink/builds/319936480 - https://travis-ci.org/tzulitai/flink/builds/319936556 ---
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301479#comment-16301479 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158496021 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -70,35 +71,45 @@ private Date initTimestamp; + private long millisBehindLatest; + /** * Creates a shard consumer. * * @param fetcherRef reference to the owning fetcher * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to * @param subscribedShard the shard this consumer is subscribed to * @param lastSequenceNum the sequence number in the shard to start consuming +* @param kinesisMetricGroup the metric group to report to */ public ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, - SequenceNumber lastSequenceNum) { + SequenceNumber lastSequenceNum, + MetricGroup kinesisMetricGroup) { this(fetcherRef, subscribedShardStateIndex, subscribedShard, lastSequenceNum, - KinesisProxy.create(fetcherRef.getConsumerConfiguration())); + KinesisProxy.create(fetcherRef.getConsumerConfiguration()), + kinesisMetricGroup); } /** This constructor is exposed for testing purposes. */ protected ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, - KinesisProxyInterface kinesis) { + KinesisProxyInterface kinesis, + MetricGroup kinesisMetricGroup) { this.fetcherRef = checkNotNull(fetcherRef); this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); this.subscribedShard = checkNotNull(subscribedShard); this.lastSequenceNum = checkNotNull(lastSequenceNum); + + checkNotNull(kinesisMetricGroup) + .gauge("millisBehindLatest", () -> millisBehindLatest); --- End diff -- Will this work? Am I allowed to use lambdas? What Java version has to be supported? > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158496021 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -70,35 +71,45 @@ private Date initTimestamp; + private long millisBehindLatest; + /** * Creates a shard consumer. * * @param fetcherRef reference to the owning fetcher * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to * @param subscribedShard the shard this consumer is subscribed to * @param lastSequenceNum the sequence number in the shard to start consuming +* @param kinesisMetricGroup the metric group to report to */ public ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, - SequenceNumber lastSequenceNum) { + SequenceNumber lastSequenceNum, + MetricGroup kinesisMetricGroup) { this(fetcherRef, subscribedShardStateIndex, subscribedShard, lastSequenceNum, - KinesisProxy.create(fetcherRef.getConsumerConfiguration())); + KinesisProxy.create(fetcherRef.getConsumerConfiguration()), + kinesisMetricGroup); } /** This constructor is exposed for testing purposes. */ protected ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, - KinesisProxyInterface kinesis) { + KinesisProxyInterface kinesis, + MetricGroup kinesisMetricGroup) { this.fetcherRef = checkNotNull(fetcherRef); this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); this.subscribedShard = checkNotNull(subscribedShard); this.lastSequenceNum = checkNotNull(lastSequenceNum); + + checkNotNull(kinesisMetricGroup) + .gauge("millisBehindLatest", () -> millisBehindLatest); --- End diff -- Will this work? Am I allowed to use lambdas? What Java version has to be supported? ---
[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301382#comment-16301382 ] ASF GitHub Bot commented on FLINK-6094: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158489047 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala --- @@ -116,14 +135,100 @@ object UpdatingPlanChecker { val windowStartEnd = w.getWindowProperties.map(_.name) --- End diff -- Window can only output start or end field, right ? > Implement stream-stream proctime non-window inner join > --- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > This includes: > 1.Implement stream-stream proctime non-window inner join > 2.Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301381#comment-16301381 ] ASF GitHub Bot commented on FLINK-6094: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158489035 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala --- @@ -116,14 +135,100 @@ object UpdatingPlanChecker { val windowStartEnd = w.getWindowProperties.map(_.name) // we have only a unique key if at least one window property is selected if (windowStartEnd.nonEmpty) { -keys = Some(groupKeys ++ windowStartEnd) +val smallestAttribute = windowStartEnd.sorted.head --- End diff -- Thanks, `windowStartEnd.min` is better, i want to get the lexicographic smallest attribute from `windowStartEnd` . I have sorted it before get the first string. > Implement stream-stream proctime non-window inner join > --- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > This includes: > 1.Implement stream-stream proctime non-window inner join > 2.Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301383#comment-16301383 ] ASF GitHub Bot commented on FLINK-6094: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158489057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.codegen.FunctionCodeGenerator +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.DataStreamInnerJoin +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +/** + * RelNode for a non-windowed stream join. + */ +class DataStreamJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +joinCondition: RexNode, +joinInfo: JoinInfo, +joinType: JoinRelType, +leftSchema: RowSchema, +rightSchema: RowSchema, +schema: RowSchema, +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType(): RelDataType = schema.relDataType + + override def needsUpdatesAsRetraction: Boolean = true + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinCondition, + joinInfo, + joinType, + leftSchema, + rightSchema, + schema, + ruleDescription) + } + + def getJoinInfo: JoinInfo = joinInfo + + def getJoinType: JoinRelType = joinType + + override def toString: String = { +joinToString( + schema.relDataType, + joinCondition, + joinType, + getExpressionString) + } + + override def explainTerms(pw: RelWriter): RelWriter = { +joinExplainTerms( + super.explainTerms(pw), + schema.relDataType, + joinCondition, + joinType, + getExpressionString) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val config = tableEnv.getConfig +val returnType = schema.typeInfo +val keyPairs = joinInfo.pairs().toList + +// get the equality keys +val leftKeys = ArrayBuffer.empty[Int] +val rightKeys = ArrayBuffer.empty[Int] +if (keyPairs.isEmpty) { --- End diff -- Hi, I have removed this check, because equal join check has been done in `FlinkLogicalJoinConverter`. Also i have added an equal join test case in `org.apache.flink.table.api.stream.table.validation.JoinValidationTest` in case of later changes in `FlinkLogicalJoinConverter`. What do you think? > Implement stream-stream proctime non-window inner join > -
[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301379#comment-16301379 ] ASF GitHub Bot commented on FLINK-6094: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158489017 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/DataStreamInnerJoin.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.CRowWrappingMultiOuputCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.slf4j.LoggerFactory +import org.apache.flink.table.codegen.Compiler + + +/** + * Connect data for left stream and right stream. Only use for innerJoin. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class DataStreamInnerJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] { + + // state to hold left stream element + private var leftState: MapState[Row, JTuple2[Int, Long]] = _ + // state to hold right stream element + private var rightState: MapState[Row, JTuple2[Int, Long]] = _ + private var cRowWrapper: CRowWrappingMultiOuputCollector = _ + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + private var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + private var rightTimer: ValueState[Long] = _ + + // other condition function + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + val LOG = LoggerFactory.getLogger(this.getClass) + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + +s"Code:\n$genJoinFuncCode") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) +LOG.debug("Instantiating JoinFunction.") +joinFunction = clazz.newInstance() + +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired time of this row. +val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG) +val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]]( +
[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301380#comment-16301380 ] ASF GitHub Bot commented on FLINK-6094: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158489025 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/DataStreamInnerJoin.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.CRowWrappingMultiOuputCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.slf4j.LoggerFactory +import org.apache.flink.table.codegen.Compiler + + +/** + * Connect data for left stream and right stream. Only use for innerJoin. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class DataStreamInnerJoin( --- End diff -- Maybe `NonWindowInnerJoin` is better. It is consistent with `non-window aggregate`. What do you think ? > Implement stream-stream proctime non-window inner join > --- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > This includes: > 1.Implement stream-stream proctime non-window inner join > 2.Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158489057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.codegen.FunctionCodeGenerator +import org.apache.flink.table.plan.nodes.CommonJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.DataStreamInnerJoin +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +/** + * RelNode for a non-windowed stream join. + */ +class DataStreamJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +joinCondition: RexNode, +joinInfo: JoinInfo, +joinType: JoinRelType, +leftSchema: RowSchema, +rightSchema: RowSchema, +schema: RowSchema, +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with CommonJoin + with DataStreamRel { + + override def deriveRowType(): RelDataType = schema.relDataType + + override def needsUpdatesAsRetraction: Boolean = true + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinCondition, + joinInfo, + joinType, + leftSchema, + rightSchema, + schema, + ruleDescription) + } + + def getJoinInfo: JoinInfo = joinInfo + + def getJoinType: JoinRelType = joinType + + override def toString: String = { +joinToString( + schema.relDataType, + joinCondition, + joinType, + getExpressionString) + } + + override def explainTerms(pw: RelWriter): RelWriter = { +joinExplainTerms( + super.explainTerms(pw), + schema.relDataType, + joinCondition, + joinType, + getExpressionString) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val config = tableEnv.getConfig +val returnType = schema.typeInfo +val keyPairs = joinInfo.pairs().toList + +// get the equality keys +val leftKeys = ArrayBuffer.empty[Int] +val rightKeys = ArrayBuffer.empty[Int] +if (keyPairs.isEmpty) { --- End diff -- Hi, I have removed this check, because equal join check has been done in `FlinkLogicalJoinConverter`. Also i have added an equal join test case in `org.apache.flink.table.api.stream.table.validation.JoinValidationTest` in case of later changes in `FlinkLogicalJoinConverter`. What do you think? ---
[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158489047 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala --- @@ -116,14 +135,100 @@ object UpdatingPlanChecker { val windowStartEnd = w.getWindowProperties.map(_.name) --- End diff -- Window can only output start or end field, right ? ---
[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158489017 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/DataStreamInnerJoin.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.CRowWrappingMultiOuputCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.slf4j.LoggerFactory +import org.apache.flink.table.codegen.Compiler + + +/** + * Connect data for left stream and right stream. Only use for innerJoin. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class DataStreamInnerJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] { + + // state to hold left stream element + private var leftState: MapState[Row, JTuple2[Int, Long]] = _ + // state to hold right stream element + private var rightState: MapState[Row, JTuple2[Int, Long]] = _ + private var cRowWrapper: CRowWrappingMultiOuputCollector = _ + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + private var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + private var rightTimer: ValueState[Long] = _ + + // other condition function + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + val LOG = LoggerFactory.getLogger(this.getClass) + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + +s"Code:\n$genJoinFuncCode") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) +LOG.debug("Instantiating JoinFunction.") +joinFunction = clazz.newInstance() + +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired time of this row. +val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG) +val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]]( + "left", leftType, tupleTypeInfo) +val rightStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]]( + "right", rightType, tupleTypeInfo) +leftState = getRuntimeContext.getMapState(leftStateDescriptor) +rightS
[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158489035 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala --- @@ -116,14 +135,100 @@ object UpdatingPlanChecker { val windowStartEnd = w.getWindowProperties.map(_.name) // we have only a unique key if at least one window property is selected if (windowStartEnd.nonEmpty) { -keys = Some(groupKeys ++ windowStartEnd) +val smallestAttribute = windowStartEnd.sorted.head --- End diff -- Thanks, `windowStartEnd.min` is better, i want to get the lexicographic smallest attribute from `windowStartEnd` . I have sorted it before get the first string. ---
[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158488989 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala --- @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase} +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.api.common.time.Time +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.functions.aggfunctions.CountAggFunction +import org.apache.flink.table.runtime.stream.table.GroupWindowITCase.TimestampAndWatermarkWithOffset +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, WeightedAvg} +import org.apache.flink.types.Row + +import scala.collection.mutable + +class JoinITCase extends StreamingWithStateTestBase { + + private val queryConfig = new StreamQueryConfig() + queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) --- End diff -- Hi, maybe we can keep it. At least we can cover some logics in if (`if (stateCleaningEnabled && timerState.value() == 0)`). ---
[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301378#comment-16301378 ] ASF GitHub Bot commented on FLINK-6094: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158488989 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala --- @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase} +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.api.common.time.Time +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.functions.aggfunctions.CountAggFunction +import org.apache.flink.table.runtime.stream.table.GroupWindowITCase.TimestampAndWatermarkWithOffset +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, WeightedAvg} +import org.apache.flink.types.Row + +import scala.collection.mutable + +class JoinITCase extends StreamingWithStateTestBase { + + private val queryConfig = new StreamQueryConfig() + queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) --- End diff -- Hi, maybe we can keep it. At least we can cover some logics in if (`if (stateCleaningEnabled && timerState.value() == 0)`). > Implement stream-stream proctime non-window inner join > --- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > This includes: > 1.Implement stream-stream proctime non-window inner join > 2.Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r158489025 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/DataStreamInnerJoin.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.CRowWrappingMultiOuputCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.slf4j.LoggerFactory +import org.apache.flink.table.codegen.Compiler + + +/** + * Connect data for left stream and right stream. Only use for innerJoin. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class DataStreamInnerJoin( --- End diff -- Maybe `NonWindowInnerJoin` is better. It is consistent with `non-window aggregate`. What do you think ? ---
[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301376#comment-16301376 ] ASF GitHub Bot commented on FLINK-6094: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/4471 Hi, @twalthr , thanks for your review. The pr has been updated according to your comments. It mainly contains the following changes: - Do some minor refactors in `UpdatingPlanChecker`, `NonWindowInnerJoin` and tests - As for indicator attributes - Event-time attributes are considered to be not supported. When executing the join, the join operator needs to make sure that no late data is emitted. Window join makes it possible by holding back watermarks, but non-window join is unbounded, so we don't know how much to hold back. - Proctime attributes are supported to be outputted from join but can not exist in join predicate. It seems there is no easy way to support proctime attributes in join predicate. If we evaluate proctime in code generator, left proctime will always equals right proctime, which makes `left.proctime > right.proctime` always return false. Currently, users can cast proctime attributes to long type if they want to do predicate. What do you think? > Implement stream-stream proctime non-window inner join > --- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > This includes: > 1.Implement stream-stream proctime non-window inner join > 2.Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4471: [FLINK-6094] [table] Implement stream-stream proctime non...
Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/4471 Hi, @twalthr , thanks for your review. The pr has been updated according to your comments. It mainly contains the following changes: - Do some minor refactors in `UpdatingPlanChecker`, `NonWindowInnerJoin` and tests - As for indicator attributes - Event-time attributes are considered to be not supported. When executing the join, the join operator needs to make sure that no late data is emitted. Window join makes it possible by holding back watermarks, but non-window join is unbounded, so we don't know how much to hold back. - Proctime attributes are supported to be outputted from join but can not exist in join predicate. It seems there is no easy way to support proctime attributes in join predicate. If we evaluate proctime in code generator, left proctime will always equals right proctime, which makes `left.proctime > right.proctime` always return false. Currently, users can cast proctime attributes to long type if they want to do predicate. What do you think? ---
[jira] [Assigned] (FLINK-8277) Optimize code generation by using local references
[ https://issues.apache.org/jira/browse/FLINK-8277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li reassigned FLINK-8277: - Assignee: Ruidong Li > Optimize code generation by using local references > -- > > Key: FLINK-8277 > URL: https://issues.apache.org/jira/browse/FLINK-8277 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Ruidong Li > > By default, Flink uses {{org.apache.calcite.rex.RexProgram#expandLocalRef}} > to remove local references which reverses the effect of common subexpression > elimination. For a more performant execution and smaller generated code we > should leverage common subexpressions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher
Fabian Hueske created FLINK-8308: Summary: Update yajl-ruby dependency to 1.3.1 or higher Key: FLINK-8308 URL: https://issues.apache.org/jira/browse/FLINK-8308 Project: Flink Issue Type: Task Components: Project Website Reporter: Fabian Hueske Priority: Critical Fix For: 1.5.0, 1.4.1 We got notified that yajl-ruby < 1.3.1, a dependency which is used to build the Flink website, has a security vulnerability of high severity. We should update yajl-ruby to 1.3.1 or higher. Since the website is built offline and served as static HTML, I don't think this is a super critical issue (please correct me if I'm wrong), but we should resolve this soon. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.
[ https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shashank Agarwal updated FLINK-7756: Fix Version/s: 1.4.1 1.5.0 > RocksDB state backend Checkpointing (Async and Incremental) is not working > with CEP. > - > > Key: FLINK-7756 > URL: https://issues.apache.org/jira/browse/FLINK-7756 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing, Streaming >Affects Versions: 1.4.0, 1.3.2 > Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.4.0, 1.5.0, 1.4.1 > > > When i try to use RocksDBStateBackend on my staging cluster (which is using > HDFS as file system) it crashes. But When i use FsStateBackend on staging > (which is using HDFS as file system) it is working fine. > On local with local file system it's working fine in both cases. > Please check attached logs. I have around 20-25 tasks in my app. > {code:java} > 2017-09-29 14:21:31,639 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=0). > 2017-09-29 14:21:31,640 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,020 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=1). > 2017-09-29 14:21:32,022 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,078 INFO com.datastax.driver.core.NettyUtil > - Found Netty's native epoll transport in the classpath, using > it > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (1/2) > (b879f192c4e8aae6671cdafb3a24c00a). > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Map (2/2) > (1ea5aef6ccc7031edc6b37da2912d90b). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (2/2) > (4bac8e764c67520d418a4c755be23d4d). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched > from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 > for operator Co-Flat Map (1/2).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator Co-Flat Map (1/2). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.str