[jira] [Updated] (FLINK-8311) Flink needs documentation for network access control

2017-12-22 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-8311:

Description: 
There is a need for better documentation on what connects to what over which 
ports in a Flink cluster to allow users to configure network access control 
rules.

E.g. I was under the impression that in a ZK HA configuration the Job Managers 
were essentially independent and only coordinated via ZK.  But starting 
multiple JMs in HA with the JM RPC port blocked between JMs shows that the 
second JM's Akka subsystem is trying to connect to the leading JM:

{code}
INFO  akka.remote.transport.ProtocolStateActor  - No 
response from remote for outbound association. Associate timed out after [2 
ms].
WARN  akka.remote.ReliableDeliverySupervisor- 
Association with remote system [akka.tcp://flink@10.210.210.127:6123] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for 
outbound association. Associate timed out after [2 ms].]
WARN  akka.remote.transport.netty.NettyTransport- Remote 
connection to [null] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: 
connection timed out: /10.210.210.127:6123
{code}

  was:
There is a need for better documentation on what connects to what over which 
ports in a Flink cluster to allow users to configure network access control 
rules.

E.g. I was under the impression that in a ZK HA configuration the Job Managers 
were essentially independent and only coordinated via ZK.  But starting 
multiple JMs in HA with the JM RPC port blocked between JMs shows that the 
second JM's Akka subsystem is trying to connect to the leading JM:

INFO  akka.remote.transport.ProtocolStateActor  - No 
response from remote for outbound association. Associate timed out after [2 
ms].
WARN  akka.remote.ReliableDeliverySupervisor- 
Association with remote system [akka.tcp://flink@10.210.210.127:6123] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for 
outbound association. Associate timed out after [2 ms].]
WARN  akka.remote.transport.netty.NettyTransport- Remote 
connection to [null] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: 
connection timed out: /10.210.210.127:6123


> Flink needs documentation for network access control
> 
>
> Key: FLINK-8311
> URL: https://issues.apache.org/jira/browse/FLINK-8311
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Elias Levy
>
> There is a need for better documentation on what connects to what over which 
> ports in a Flink cluster to allow users to configure network access control 
> rules.
> E.g. I was under the impression that in a ZK HA configuration the Job 
> Managers were essentially independent and only coordinated via ZK.  But 
> starting multiple JMs in HA with the JM RPC port blocked between JMs shows 
> that the second JM's Akka subsystem is trying to connect to the leading JM:
> {code}
> INFO  akka.remote.transport.ProtocolStateActor  - No 
> response from remote for outbound association. Associate timed out after 
> [2 ms].
> WARN  akka.remote.ReliableDeliverySupervisor- 
> Association with remote system [akka.tcp://flink@10.210.210.127:6123] has 
> failed, address is now gated for [5000] ms. Reason: [Association failed with 
> [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote 
> for outbound association. Associate timed out after [2 ms].]
> WARN  akka.remote.transport.netty.NettyTransport- Remote 
> connection to [null] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: 
> connection timed out: /10.210.210.127:6123
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8311) Flink needs documentation for network access control

2017-12-22 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8311:
-

 Summary: Flink needs documentation for network access control
 Key: FLINK-8311
 URL: https://issues.apache.org/jira/browse/FLINK-8311
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.4.0
Reporter: Elias Levy


There is a need for better documentation on what connects to what over which 
ports in a Flink cluster to allow users to configure network access control 
rules.

E.g. I was under the impression that in a ZK HA configuration the Job Managers 
were essentially independent and only coordinated via ZK.  But starting 
multiple JMs in HA with the JM RPC port blocked between JMs shows that the 
second JM's Akka subsystem is trying to connect to the leading JM:

INFO  akka.remote.transport.ProtocolStateActor  - No 
response from remote for outbound association. Associate timed out after [2 
ms].
WARN  akka.remote.ReliableDeliverySupervisor- 
Association with remote system [akka.tcp://flink@10.210.210.127:6123] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for 
outbound association. Associate timed out after [2 ms].]
WARN  akka.remote.transport.netty.NettyTransport- Remote 
connection to [null] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: 
connection timed out: /10.210.210.127:6123



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8310) Flink 1.4 Column 'rowtime' not found in any table

2017-12-22 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske resolved FLINK-8310.
--
Resolution: Not A Problem

I've answered [your question on Stack 
Overflow|https://stackoverflow.com/questions/47947030/flink-1-4-column-rowtime-not-found-in-any-table/47947652#47947652].
 

Please reopen, if the issue turns out to be a bug.

Thanks, Fabian

> Flink 1.4 Column 'rowtime' not found in any table
> -
>
> Key: FLINK-8310
> URL: https://issues.apache.org/jira/browse/FLINK-8310
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API & SQL
>Affects Versions: 1.4.0
> Environment: Ubuntu
> JDK 8
>Reporter: Xuan Nguyen
> Attachments: KafkaSqlStream.java
>
>
> After following the 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html#configure-a-rowtime-attribute
>  documentation.
> I register the timestamp field, but I still get rowtime not found in any 
> table:
> {code:java}
> KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic
> .forTopic("alerting")
> // set Kafka consumer properties
> .withKafkaProperties(getKafkaProperties())
> // set Table schema
> .withSchema(TableSchema.builder()
> .field("tenant", Types.STRING())
> .field("message", Types.STRING())
> .field("frequency", Types.LONG())
> .field("timestamp", Types.SQL_TIMESTAMP()).build())
> .failOnMissingField(true)
> .withRowtimeAttribute(
> // "timestamp" is rowtime attribute
> "timestamp",
> // value of "timestamp" is extracted from existing field 
> with same name
> new ExistingField("timestamp"),
> // values of "timestamp" are at most out-of-order by 30 
> seconds
> new 
> BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1)))
> .build();
> //register the alerting topic as kafka
> tEnv.registerTableSource("kafka", source);
> Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " +
> "FROM kafka " +
> "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), 
> tenant, message");
> tEnv.toAppendStream(results, Row.class).print();
> {code}
> I get the following error:
> {code}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 64 to line 1, column 70: Column 
> 'rowtime' not found in any table at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93)
>  at 
> org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561)
>  at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) Caused by: 
> org.apache.calcite.runtime.CalciteContextException: From line 1, column 64 to 
> line 1, column 70: Column 'rowtime' not found in any table at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8310) Flink 1.4 Column 'rowtime' not found in any table

2017-12-22 Thread Xuan Nguyen (JIRA)
Xuan Nguyen created FLINK-8310:
--

 Summary: Flink 1.4 Column 'rowtime' not found in any table
 Key: FLINK-8310
 URL: https://issues.apache.org/jira/browse/FLINK-8310
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Table API & SQL
Affects Versions: 1.4.0
 Environment: Ubuntu
JDK 8
Reporter: Xuan Nguyen
 Attachments: KafkaSqlStream.java

After following the 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html#configure-a-rowtime-attribute
 documentation.
I register the timestamp field, but I still get rowtime not found in any table:


{code:java}
KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic
.forTopic("alerting")
// set Kafka consumer properties
.withKafkaProperties(getKafkaProperties())
// set Table schema
.withSchema(TableSchema.builder()
.field("tenant", Types.STRING())
.field("message", Types.STRING())
.field("frequency", Types.LONG())
.field("timestamp", Types.SQL_TIMESTAMP()).build())
.failOnMissingField(true)
.withRowtimeAttribute(
// "timestamp" is rowtime attribute
"timestamp",
// value of "timestamp" is extracted from existing field 
with same name
new ExistingField("timestamp"),
// values of "timestamp" are at most out-of-order by 30 
seconds
new BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1)))
.build();

//register the alerting topic as kafka
tEnv.registerTableSource("kafka", source);

Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " +
"FROM kafka " +
"GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), 
tenant, message");

tEnv.toAppendStream(results, Row.class).print();
{code}


I get the following error:
{code}
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. From line 1, column 64 to line 1, column 70: Column 
'rowtime' not found in any table at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93)
 at 
org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561)
 at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) Caused by: 
org.apache.calcite.runtime.CalciteContextException: From line 1, column 64 to 
line 1, column 70: Column 'rowtime' not found in any table at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8309) JVM sigsegv crash when enabling async checkpoints

2017-12-22 Thread Kevin Pullin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Pullin closed FLINK-8309.
---
Resolution: Resolved

> JVM sigsegv crash when enabling async checkpoints
> -
>
> Key: FLINK-8309
> URL: https://issues.apache.org/jira/browse/FLINK-8309
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
> Environment: macOS 10.13.2 & Ubuntu 16.04.03 using JVM 1.8.0_151.
>Reporter: Kevin Pullin
> Attachments: StreamingJob.scala
>
>
> h4. Summary
> I have a streaming job with async checkpointing enabled. The job is crashing 
> the JVM with a SIGSEGV error coinciding with checkpoint completion.
> Workarounds are noted below. I thought this was worth documenting in case 
> someone runs into similar issues or if a fix is possible.
> h4. Job Overview & Observations
> The job itself stores a large quantity of `case class` objects in 
> `valueState`s contained within a `RichFilterFunction`. This data is used for 
> deduplicating events.
> The crash stops by:
>  - moving the case class outside of the anonymous RichFilterFunction class.
>  - reducing the number of objects stored in the valueState.
>  - reducing the size of the objects stored in the valueState.
>  - disabling async snapshots.
> I can provide additional crash data as needed (core dumps, error logs, etc).  
> The StateBackend implementation doesn't matter; the job fails using the 
> Memory, Fs, and RocksDb backends.
> From what I understand anonymous classes should be avoided with checkpointing 
> as the name isn't stable, so that seems like the best route for me.
> h4. Reproduction case
> The attached a `StreamingJob.scala` file that contains a minimal repo case, 
> which closely aligns with my actual job configuration.  Running it 
> consistently crashes the JVM upon completion of the first checkpoint.
> My tests runs set only two JVM options => -Xms4g -Xmx4g
> h4. Crash output
> Here's a crash captured from Ubuntu:
> {noformat}
> [info] #
> [info] # A fatal error has been detected by the Java Runtime Environment:
> [info] #
> [info] #  SIGSEGV (0xb) at pc=0x7fd192b92c1c, pid=7191, 
> tid=0x7fd0873f3700
> [info] #
> [info] # JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build 
> 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12)
> [info] # Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 
> compressed oops)
> [info] # Problematic frame:
> [info] # C  [libzip.so+0x5c1c]
> [info] #
> [info] # Core dump written. Default location: /home/ubuntu/flink-project/core 
> or core.7191
> [info] #
> [info] # An error report file with more information is saved as:
> [info] # /home/XXX/flink-project/hs_err_pid7191.log
> [info] Compiled method (nm)   71547   81 n 0   
> java.util.zip.ZipFile::getEntry (native)
> [info]  total in heap  [0x7fd17d12e290,0x7fd17d12e600] = 880
> [info]  relocation [0x7fd17d12e3b8,0x7fd17d12e400] = 72
> [info]  main code  [0x7fd17d12e400,0x7fd17d12e600] = 512
> [info] #
> [info] # If you would like to submit a bug report, please visit:
> [info] #   http://bugreport.java.com/bugreport/crash.jsp
> [info] #
> {noformat}
> And one from macOS:
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x000105264c48, pid=30848, tid=0x3403
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_151-b12) (build 
> 1.8.0_151-b12)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.151-b12 mixed mode bsd-amd64 
> compressed oops)
> # Problematic frame:
> # V  [libjvm.dylib+0x464c48]
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> # /Users/XXX/src/etc_flink_mwx/hs_err_pid30848.log
> [thread 30211 also had an error]
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8309) JVM sigsegv crash when enabling async checkpoints

2017-12-22 Thread Kevin Pullin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301856#comment-16301856
 ] 

Kevin Pullin commented on FLINK-8309:
-

Thanks Stefan.

I had run across that, but didn't try running with JDK 9 as I saw a Flink JIRA 
issue still open for JDK 9 compatibility. I did a test run w/ JDK 9 and the 
problem is no longer occuring.

For JDK 8 there is a  suggestion to use the  
`-Dsun.zip.disableMemoryMapping=true` flag.  That isn't helping so it's unclear 
to me if that particular fix is what's helping here or some other JDK 9 change.

In any case I'll close this issue!

> JVM sigsegv crash when enabling async checkpoints
> -
>
> Key: FLINK-8309
> URL: https://issues.apache.org/jira/browse/FLINK-8309
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
> Environment: macOS 10.13.2 & Ubuntu 16.04.03 using JVM 1.8.0_151.
>Reporter: Kevin Pullin
> Attachments: StreamingJob.scala
>
>
> h4. Summary
> I have a streaming job with async checkpointing enabled. The job is crashing 
> the JVM with a SIGSEGV error coinciding with checkpoint completion.
> Workarounds are noted below. I thought this was worth documenting in case 
> someone runs into similar issues or if a fix is possible.
> h4. Job Overview & Observations
> The job itself stores a large quantity of `case class` objects in 
> `valueState`s contained within a `RichFilterFunction`. This data is used for 
> deduplicating events.
> The crash stops by:
>  - moving the case class outside of the anonymous RichFilterFunction class.
>  - reducing the number of objects stored in the valueState.
>  - reducing the size of the objects stored in the valueState.
>  - disabling async snapshots.
> I can provide additional crash data as needed (core dumps, error logs, etc).  
> The StateBackend implementation doesn't matter; the job fails using the 
> Memory, Fs, and RocksDb backends.
> From what I understand anonymous classes should be avoided with checkpointing 
> as the name isn't stable, so that seems like the best route for me.
> h4. Reproduction case
> The attached a `StreamingJob.scala` file that contains a minimal repo case, 
> which closely aligns with my actual job configuration.  Running it 
> consistently crashes the JVM upon completion of the first checkpoint.
> My tests runs set only two JVM options => -Xms4g -Xmx4g
> h4. Crash output
> Here's a crash captured from Ubuntu:
> {noformat}
> [info] #
> [info] # A fatal error has been detected by the Java Runtime Environment:
> [info] #
> [info] #  SIGSEGV (0xb) at pc=0x7fd192b92c1c, pid=7191, 
> tid=0x7fd0873f3700
> [info] #
> [info] # JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build 
> 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12)
> [info] # Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 
> compressed oops)
> [info] # Problematic frame:
> [info] # C  [libzip.so+0x5c1c]
> [info] #
> [info] # Core dump written. Default location: /home/ubuntu/flink-project/core 
> or core.7191
> [info] #
> [info] # An error report file with more information is saved as:
> [info] # /home/XXX/flink-project/hs_err_pid7191.log
> [info] Compiled method (nm)   71547   81 n 0   
> java.util.zip.ZipFile::getEntry (native)
> [info]  total in heap  [0x7fd17d12e290,0x7fd17d12e600] = 880
> [info]  relocation [0x7fd17d12e3b8,0x7fd17d12e400] = 72
> [info]  main code  [0x7fd17d12e400,0x7fd17d12e600] = 512
> [info] #
> [info] # If you would like to submit a bug report, please visit:
> [info] #   http://bugreport.java.com/bugreport/crash.jsp
> [info] #
> {noformat}
> And one from macOS:
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x000105264c48, pid=30848, tid=0x3403
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_151-b12) (build 
> 1.8.0_151-b12)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.151-b12 mixed mode bsd-amd64 
> compressed oops)
> # Problematic frame:
> # V  [libjvm.dylib+0x464c48]
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> # /Users/XXX/src/etc_flink_mwx/hs_err_pid30848.log
> [thread 30211 also had an error]
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
> {noformat}



--
This message was sent by Atlassian 

[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301835#comment-16301835
 ] 

ASF GitHub Bot commented on FLINK-8289:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5190#discussion_r158545462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -155,6 +156,7 @@ protected void runCluster(Configuration configuration) 
throws Exception {
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, 
commonRpcService.getPort());
+   configuration.setString(RestOptions.REST_ADDRESS, 
commonRpcService.getAddress());
--- End diff --

This appears to be configuring the REST server's bind address, using the 
server address obtained from `AkkaRpcService`.   I don't understand the 
rationale, could you explain?


> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8222) Update Scala version

2017-12-22 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-8222.
-
Resolution: Implemented

master: 8987de3b241d23bbcc6ca5640e3cb77972a60be4

> Update Scala version
> 
>
> Key: FLINK-8222
> URL: https://issues.apache.org/jira/browse/FLINK-8222
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.5.0
>
>
> Update Scala to version {{2.11.12}}. I don't believe this affects the Flink 
> distribution but rather anyone who is compiling Flink or a 
> Flink-quickstart-derived program on a shared system.
> "A privilege escalation vulnerability (CVE-2017-15288) has been identified in 
> the Scala compilation daemon."
> https://www.scala-lang.org/news/security-update-nov17.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8223) Update Hadoop versions

2017-12-22 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-8223.
-
Resolution: Fixed

master: d3cd51a3f9fbb3ffbe6d23a57ff3884733eb47fa

> Update Hadoop versions
> --
>
> Key: FLINK-8223
> URL: https://issues.apache.org/jira/browse/FLINK-8223
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Update 2.7.3 to 2.7.5 and 2.8.0 to 2.8.3. See 
> http://hadoop.apache.org/releases.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...

2017-12-22 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5190#discussion_r158545462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -155,6 +156,7 @@ protected void runCluster(Configuration configuration) 
throws Exception {
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, 
commonRpcService.getPort());
+   configuration.setString(RestOptions.REST_ADDRESS, 
commonRpcService.getAddress());
--- End diff --

This appears to be configuring the REST server's bind address, using the 
server address obtained from `AkkaRpcService`.   I don't understand the 
rationale, could you explain?


---


[jira] [Updated] (FLINK-8223) Update Hadoop versions

2017-12-22 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-8223:
--
Description: Update 2.7.3 to 2.7.5 and 2.8.0 to 2.8.3. See 
http://hadoop.apache.org/releases.html  (was: Update 2.7.3 to 2.7.4 and 2.8.0 
to 2.8.2. See http://hadoop.apache.org/releases.html)

> Update Hadoop versions
> --
>
> Key: FLINK-8223
> URL: https://issues.apache.org/jira/browse/FLINK-8223
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Update 2.7.3 to 2.7.5 and 2.8.0 to 2.8.3. See 
> http://hadoop.apache.org/releases.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8223) Update Hadoop versions

2017-12-22 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-8223:
--
Fix Version/s: 1.5.0

> Update Hadoop versions
> --
>
> Key: FLINK-8223
> URL: https://issues.apache.org/jira/browse/FLINK-8223
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See 
> http://hadoop.apache.org/releases.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8222) Update Scala version

2017-12-22 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-8222:
--
Fix Version/s: 1.5.0

> Update Scala version
> 
>
> Key: FLINK-8222
> URL: https://issues.apache.org/jira/browse/FLINK-8222
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.5.0
>
>
> Update Scala to version {{2.11.12}}. I don't believe this affects the Flink 
> distribution but rather anyone who is compiling Flink or a 
> Flink-quickstart-derived program on a shared system.
> "A privilege escalation vulnerability (CVE-2017-15288) has been identified in 
> the Scala compilation daemon."
> https://www.scala-lang.org/news/security-update-nov17.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8223) Update Hadoop versions

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301830#comment-16301830
 ] 

ASF GitHub Bot commented on FLINK-8223:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5137


> Update Hadoop versions
> --
>
> Key: FLINK-8223
> URL: https://issues.apache.org/jira/browse/FLINK-8223
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
>
> Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See 
> http://hadoop.apache.org/releases.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301831#comment-16301831
 ] 

ASF GitHub Bot commented on FLINK-8289:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5190
  
The configuration option `REST_ADDRESS` seems destined to cause pain.  
Surveying the code, it seems to variously represent a bind address, a server 
address, and an advertised address.   We should rename and clarify ASAP.


> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301828#comment-16301828
 ] 

ASF GitHub Bot commented on FLINK-5506:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5126


> Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
> -
>
> Key: FLINK-5506
> URL: https://issues.apache.org/jira/browse/FLINK-5506
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.4, 1.3.2, 1.4.1
>Reporter: Miguel E. Coimbra
>Assignee: Greg Hogan
>  Labels: easyfix, newbie
> Fix For: 1.5.0, 1.4.1
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Reporting this here as per Vasia's advice.
> I am having the following problem while trying out the 
> org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API 
> (Java).
> Specs: JDK 1.8.0_102 x64
> Apache Flink: 1.1.4
> Suppose I have a very small (I tried an example with 38 vertices as well) 
> dataset stored in a tab-separated file 3-vertex.tsv:
> {code}
> #id1 id2 score
> 010
> 020
> 030
> {code}
> This is just a central vertex with 3 neighbors (disconnected between 
> themselves).
> I am loading the dataset and executing the algorithm with the following code:
> {code}
> // Load the data from the .tsv file.
> final DataSet> edgeTuples = 
> env.readCsvFile(inputPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#")  // comments start with "%"
> .types(Long.class, Long.class, Double.class);
> // Generate a graph and add reverse edges (undirected).
> final Graph graph = Graph.fromTupleDataSet(edgeTuples, 
> new MapFunction() {
> private static final long serialVersionUID = 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env).getUndirected();
> // CommunityDetection parameters.
> final double hopAttenuationDelta = 0.5d;
> final int iterationCount = 10;
> // Prepare and trigger the execution.
> DataSet> vs = graph.run(new 
> org.apache.flink.graph.library.CommunityDetection(iterationCount, 
> hopAttenuationDelta)).getVertices();
> vs.print();
> {code}
> ​Running this code throws the following exception​ (check the bold line):
> {code}
> ​org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)
> at 
> org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at 
> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at 
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> at 
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)​
> {code}
> ​After a further look, I set a breakpoint (Eclipse IDE debugging) at the line 
> in bold:
> org.apache.flink.graph.library.CommunityDetection.java (sour

[jira] [Commented] (FLINK-8222) Update Scala version

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301829#comment-16301829
 ] 

ASF GitHub Bot commented on FLINK-8222:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5136


> Update Scala version
> 
>
> Key: FLINK-8222
> URL: https://issues.apache.org/jira/browse/FLINK-8222
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Update Scala to version {{2.11.12}}. I don't believe this affects the Flink 
> distribution but rather anyone who is compiling Flink or a 
> Flink-quickstart-derived program on a shared system.
> "A privilege escalation vulnerability (CVE-2017-15288) has been identified in 
> the Scala compilation daemon."
> https://www.scala-lang.org/news/security-update-nov17.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5190: [FLINK-8289] [runtime] set the rest.address to the host o...

2017-12-22 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5190
  
The configuration option `REST_ADDRESS` seems destined to cause pain.  
Surveying the code, it seems to variously represent a bind address, a server 
address, and an advertised address.   We should rename and clarify ASAP.


---


[GitHub] flink pull request #5195: [hotfix] [build] Always include Kafka 0.11 connect...

2017-12-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5195


---


[GitHub] flink pull request #5136: [FLINK-8222] [build] Update Scala version

2017-12-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5136


---


[GitHub] flink pull request #5137: [FLINK-8223] [build] Update Hadoop versions

2017-12-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5137


---


[GitHub] flink pull request #5126: [FLINK-5506] [gelly] Fix CommunityDetection NullPo...

2017-12-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5126


---


[GitHub] flink pull request #5199: [hotfix] [javadoc] Fix typos in MemorySegment clas...

2017-12-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5199


---


[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-22 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301817#comment-16301817
 ] 

Eron Wright  commented on FLINK-8289:
-

I guess we should be clear about who is being advertised to.  For example, if 
the value is intended for use by the client, we'd want to give the proxy 
address.   If the value is intended for use by the proxy (as the upstream 
address), we'd want to give the server address. 

> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7860) Support YARN proxy user in Flink (impersonation)

2017-12-22 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301806#comment-16301806
 ] 

Eron Wright  edited comment on FLINK-7860 at 12/22/17 6:54 PM:
---

Regarding how a proxy user would be configured, the goal is to set the login 
user to a proxy user UGI that wraps the kerberos (real) UGI. The real UGI must 
continue to be initialized using a keytab as normal.  Rather than introduce new 
config settings, Flink could simply make use of Hadoop's built-in 
`HADOOP_PROXY_USER` environment variable.

I suggest that Flink simply propagate the `HADOOP_PROXY_USER` variable to the 
AM/TM.   Then, in `org.apache.flink.runtime.security.modules.HadoopModule`, 
wrap the `loginUser` with a proxy-user UGI when `HADOOP_PROXY_USER` is set and 
then call `UGI.setLoginUser`.  This need only be done in the 
`loginUserFromKeytab` scenario, not in the `loginUserFromSubject` scenario 
since `loginUserFromSubject` already does exactly that.

See HADOOP-8561.



was (Author: eronwright):
Regarding how a proxy user would be configured, the goal is to set the login 
user to a proxy user UGI that wraps the kerberos (real) UGI. The real UGI must 
continue to be initialized using a keytab as normal.  Rather than introduce new 
config settings, Flink could simply make use of Hadoop's built-in 
`HADOOP_PROXY_USER` environment variable.

I suggest that Flink simply propagate the `HADOOP_PROXY_USER` variable to the 
AM/TM.   Then, in `org.apache.flink.runtime.security.modules.HadoopModule`, 
wrap the `loginUser` with a proxy-user UGI when `HADOOP_PROXY_USER` is set and 
then call `UGI.setLoginUser`.  This need only be done in the 
`loginUserFromKeytab` scenario, not in the `loginUserFromSubject` scenario 
since `loginUserFromSubject` already does exactly that.


> Support YARN proxy user in Flink (impersonation)
> 
>
> Key: FLINK-7860
> URL: https://issues.apache.org/jira/browse/FLINK-7860
> Project: Flink
>  Issue Type: New Feature
>  Components: YARN
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7860) Support YARN proxy user in Flink (impersonation)

2017-12-22 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301806#comment-16301806
 ] 

Eron Wright  commented on FLINK-7860:
-

Regarding how a proxy user would be configured, the goal is to set the login 
user to a proxy user UGI that wraps the kerberos (real) UGI. The real UGI must 
continue to be initialized using a keytab as normal.  Rather than introduce new 
config settings, Flink could simply make use of Hadoop's built-in 
`HADOOP_PROXY_USER` environment variable.

I suggest that Flink simply propagate the `HADOOP_PROXY_USER` variable to the 
AM/TM.   Then, in `org.apache.flink.runtime.security.modules.HadoopModule`, 
wrap the `loginUser` with a proxy-user UGI when `HADOOP_PROXY_USER` is set and 
then call `UGI.setLoginUser`.  This need only be done in the 
`loginUserFromKeytab` scenario, not in the `loginUserFromSubject` scenario 
since `loginUserFromSubject` already does exactly that.


> Support YARN proxy user in Flink (impersonation)
> 
>
> Key: FLINK-7860
> URL: https://issues.apache.org/jira/browse/FLINK-7860
> Project: Flink
>  Issue Type: New Feature
>  Components: YARN
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301773#comment-16301773
 ] 

ASF GitHub Bot commented on FLINK-8037:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5205

[FLINK-8037] Fix integer multiplication or shift implicitly cast to long

## What is the purpose of the change

Fixes potential overflow flagged by the IntelliJ inspection "Integer 
multiplication or shift implicitly cast to long".

## Brief change log

- mark integer literal as long
- cast multiplied integer to long

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no)**
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
8037_fix_integer_multiplication_or_shift_implicitly_cast_to_long

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5205


commit ec2f80c5e890dc52b715eeca719c911aaa75fb82
Author: Greg Hogan 
Date:   2017-12-21T17:45:54Z

[FLINK-8037] Fix integer multiplication or shift implicitly cast to long




> Missing cast in integer arithmetic in 
> TransactionalIdsGenerator#generateIdsToAbort
> --
>
> Key: FLINK-8037
> URL: https://issues.apache.org/jira/browse/FLINK-8037
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Greg Hogan
>Priority: Minor
>
> {code}
>   public Set generateIdsToAbort() {
> Set idsToAbort = new HashSet<>();
> for (int i = 0; i < safeScaleDownFactor; i++) {
>   idsToAbort.addAll(generateIdsToUse(i * poolSize * 
> totalNumberOfSubtasks));
> {code}
> The operands are integers where generateIdsToUse() expects long parameter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5205: [FLINK-8037] Fix integer multiplication or shift i...

2017-12-22 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5205

[FLINK-8037] Fix integer multiplication or shift implicitly cast to long

## What is the purpose of the change

Fixes potential overflow flagged by the IntelliJ inspection "Integer 
multiplication or shift implicitly cast to long".

## Brief change log

- mark integer literal as long
- cast multiplied integer to long

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no)**
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
8037_fix_integer_multiplication_or_shift_implicitly_cast_to_long

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5205


commit ec2f80c5e890dc52b715eeca719c911aaa75fb82
Author: Greg Hogan 
Date:   2017-12-21T17:45:54Z

[FLINK-8037] Fix integer multiplication or shift implicitly cast to long




---


[jira] [Commented] (FLINK-8309) JVM sigsegv crash when enabling async checkpoints

2017-12-22 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301756#comment-16301756
 ] 

Stefan Richter commented on FLINK-8309:
---

Looks like this is very likely related to this Java problem: 
https://bugs.java.com/view_bug.do?bug_id=8145260 .

Maybe you can confirm and close the issue in that case, because I think there 
is not much we can do about this?

> JVM sigsegv crash when enabling async checkpoints
> -
>
> Key: FLINK-8309
> URL: https://issues.apache.org/jira/browse/FLINK-8309
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
> Environment: macOS 10.13.2 & Ubuntu 16.04.03 using JVM 1.8.0_151.
>Reporter: Kevin Pullin
> Attachments: StreamingJob.scala
>
>
> h4. Summary
> I have a streaming job with async checkpointing enabled. The job is crashing 
> the JVM with a SIGSEGV error coinciding with checkpoint completion.
> Workarounds are noted below. I thought this was worth documenting in case 
> someone runs into similar issues or if a fix is possible.
> h4. Job Overview & Observations
> The job itself stores a large quantity of `case class` objects in 
> `valueState`s contained within a `RichFilterFunction`. This data is used for 
> deduplicating events.
> The crash stops by:
>  - moving the case class outside of the anonymous RichFilterFunction class.
>  - reducing the number of objects stored in the valueState.
>  - reducing the size of the objects stored in the valueState.
>  - disabling async snapshots.
> I can provide additional crash data as needed (core dumps, error logs, etc).  
> The StateBackend implementation doesn't matter; the job fails using the 
> Memory, Fs, and RocksDb backends.
> From what I understand anonymous classes should be avoided with checkpointing 
> as the name isn't stable, so that seems like the best route for me.
> h4. Reproduction case
> The attached a `StreamingJob.scala` file that contains a minimal repo case, 
> which closely aligns with my actual job configuration.  Running it 
> consistently crashes the JVM upon completion of the first checkpoint.
> My tests runs set only two JVM options => -Xms4g -Xmx4g
> h4. Crash output
> Here's a crash captured from Ubuntu:
> {noformat}
> [info] #
> [info] # A fatal error has been detected by the Java Runtime Environment:
> [info] #
> [info] #  SIGSEGV (0xb) at pc=0x7fd192b92c1c, pid=7191, 
> tid=0x7fd0873f3700
> [info] #
> [info] # JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build 
> 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12)
> [info] # Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 
> compressed oops)
> [info] # Problematic frame:
> [info] # C  [libzip.so+0x5c1c]
> [info] #
> [info] # Core dump written. Default location: /home/ubuntu/flink-project/core 
> or core.7191
> [info] #
> [info] # An error report file with more information is saved as:
> [info] # /home/XXX/flink-project/hs_err_pid7191.log
> [info] Compiled method (nm)   71547   81 n 0   
> java.util.zip.ZipFile::getEntry (native)
> [info]  total in heap  [0x7fd17d12e290,0x7fd17d12e600] = 880
> [info]  relocation [0x7fd17d12e3b8,0x7fd17d12e400] = 72
> [info]  main code  [0x7fd17d12e400,0x7fd17d12e600] = 512
> [info] #
> [info] # If you would like to submit a bug report, please visit:
> [info] #   http://bugreport.java.com/bugreport/crash.jsp
> [info] #
> {noformat}
> And one from macOS:
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x000105264c48, pid=30848, tid=0x3403
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_151-b12) (build 
> 1.8.0_151-b12)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.151-b12 mixed mode bsd-amd64 
> compressed oops)
> # Problematic frame:
> # V  [libjvm.dylib+0x464c48]
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> # /Users/XXX/src/etc_flink_mwx/hs_err_pid30848.log
> [thread 30211 also had an error]
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8309) JVM sigsegv crash when enabling async checkpoints

2017-12-22 Thread Kevin Pullin (JIRA)
Kevin Pullin created FLINK-8309:
---

 Summary: JVM sigsegv crash when enabling async checkpoints
 Key: FLINK-8309
 URL: https://issues.apache.org/jira/browse/FLINK-8309
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.3.2, 1.4.0
 Environment: macOS 10.13.2 & Ubuntu 16.04.03 using JVM 1.8.0_151.
Reporter: Kevin Pullin
 Attachments: StreamingJob.scala

h4. Summary

I have a streaming job with async checkpointing enabled. The job is crashing 
the JVM with a SIGSEGV error coinciding with checkpoint completion.

Workarounds are noted below. I thought this was worth documenting in case 
someone runs into similar issues or if a fix is possible.

h4. Job Overview & Observations

The job itself stores a large quantity of `case class` objects in `valueState`s 
contained within a `RichFilterFunction`. This data is used for deduplicating 
events.

The crash stops by:
 - moving the case class outside of the anonymous RichFilterFunction class.
 - reducing the number of objects stored in the valueState.
 - reducing the size of the objects stored in the valueState.
 - disabling async snapshots.

I can provide additional crash data as needed (core dumps, error logs, etc).  
The StateBackend implementation doesn't matter; the job fails using the Memory, 
Fs, and RocksDb backends.

>From what I understand anonymous classes should be avoided with checkpointing 
>as the name isn't stable, so that seems like the best route for me.

h4. Reproduction case

The attached a `StreamingJob.scala` file that contains a minimal repo case, 
which closely aligns with my actual job configuration.  Running it consistently 
crashes the JVM upon completion of the first checkpoint.

My tests runs set only two JVM options => -Xms4g -Xmx4g

h4. Crash output

Here's a crash captured from Ubuntu:

{noformat}

[info] #
[info] # A fatal error has been detected by the Java Runtime Environment:
[info] #
[info] #  SIGSEGV (0xb) at pc=0x7fd192b92c1c, pid=7191, 
tid=0x7fd0873f3700
[info] #
[info] # JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build 
1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12)
[info] # Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 
compressed oops)
[info] # Problematic frame:
[info] # C  [libzip.so+0x5c1c]
[info] #
[info] # Core dump written. Default location: /home/ubuntu/flink-project/core 
or core.7191
[info] #
[info] # An error report file with more information is saved as:
[info] # /home/XXX/flink-project/hs_err_pid7191.log
[info] Compiled method (nm)   71547   81 n 0   
java.util.zip.ZipFile::getEntry (native)
[info]  total in heap  [0x7fd17d12e290,0x7fd17d12e600] = 880
[info]  relocation [0x7fd17d12e3b8,0x7fd17d12e400] = 72
[info]  main code  [0x7fd17d12e400,0x7fd17d12e600] = 512
[info] #
[info] # If you would like to submit a bug report, please visit:
[info] #   http://bugreport.java.com/bugreport/crash.jsp
[info] #
{noformat}

And one from macOS:

{noformat}

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000105264c48, pid=30848, tid=0x3403
#
# JRE version: Java(TM) SE Runtime Environment (8.0_151-b12) (build 
1.8.0_151-b12)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.151-b12 mixed mode bsd-amd64 
compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x464c48]
#
# Failed to write core dump. Core dumps have been disabled. To enable core 
dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /Users/XXX/src/etc_flink_mwx/hs_err_pid30848.log
[thread 30211 also had an error]
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#

Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
{noformat}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301529#comment-16301529
 ] 

ASF GitHub Bot commented on FLINK-8283:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5201
  
The following 10 local Travis runs suggests that the stalling tests on the 
`connectors` build no longer remains after this change:

- https://travis-ci.org/tzulitai/flink/builds/319936358
- https://travis-ci.org/tzulitai/flink/builds/319936375
- https://travis-ci.org/tzulitai/flink/builds/319936388
- https://travis-ci.org/tzulitai/flink/builds/319936405
- https://travis-ci.org/tzulitai/flink/builds/319936419
- https://travis-ci.org/tzulitai/flink/builds/319936440
- https://travis-ci.org/tzulitai/flink/builds/319936454
- https://travis-ci.org/tzulitai/flink/builds/319936469
- https://travis-ci.org/tzulitai/flink/builds/319936480
- https://travis-ci.org/tzulitai/flink/builds/319936556


> FlinkKafkaConsumerBase failing on Travis with no output in 10min
> 
>
> Key: FLINK-8283
> URL: https://issues.apache.org/jira/browse/FLINK-8283
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
>
> Since a few days, Travis builds with the {{connectors}} profile keep failing 
> more often with no new output being received within 10 minutes. It seems to 
> start with the Travis build for 
> https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9
>  but may have been introduced earlier. The printed offsets look strange 
> though.
> {code}
> 16:33:12,508 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
> restore state in the FlinkKafkaConsumer: 
> {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, 
> KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773}
> 16:33:12,520 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 2 will start reading 66 partitions with offsets in restored 
> state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=656}=-915

[GitHub] flink issue #5201: [FLINK-8283] [kafka] Stabalize FlinkKafkaConsumerBaseTest...

2017-12-22 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5201
  
The following 10 local Travis runs suggests that the stalling tests on the 
`connectors` build no longer remains after this change:

- https://travis-ci.org/tzulitai/flink/builds/319936358
- https://travis-ci.org/tzulitai/flink/builds/319936375
- https://travis-ci.org/tzulitai/flink/builds/319936388
- https://travis-ci.org/tzulitai/flink/builds/319936405
- https://travis-ci.org/tzulitai/flink/builds/319936419
- https://travis-ci.org/tzulitai/flink/builds/319936440
- https://travis-ci.org/tzulitai/flink/builds/319936454
- https://travis-ci.org/tzulitai/flink/builds/319936469
- https://travis-ci.org/tzulitai/flink/builds/319936480
- https://travis-ci.org/tzulitai/flink/builds/319936556


---


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301479#comment-16301479
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158496021
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -70,35 +71,45 @@
 
private Date initTimestamp;
 
+   private long millisBehindLatest;
+
/**
 * Creates a shard consumer.
 *
 * @param fetcherRef reference to the owning fetcher
 * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
 * @param subscribedShard the shard this consumer is subscribed to
 * @param lastSequenceNum the sequence number in the shard to start 
consuming
+* @param kinesisMetricGroup the metric group to report to
 */
public ShardConsumer(KinesisDataFetcher fetcherRef,
Integer 
subscribedShardStateIndex,
StreamShardHandle 
subscribedShard,
-   SequenceNumber lastSequenceNum) 
{
+   SequenceNumber lastSequenceNum,
+   MetricGroup kinesisMetricGroup) 
{
this(fetcherRef,
subscribedShardStateIndex,
subscribedShard,
lastSequenceNum,
-   
KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
+   
KinesisProxy.create(fetcherRef.getConsumerConfiguration()),
+   kinesisMetricGroup);
}
 
/** This constructor is exposed for testing purposes. */
protected ShardConsumer(KinesisDataFetcher fetcherRef,
Integer 
subscribedShardStateIndex,
StreamShardHandle 
subscribedShard,
SequenceNumber 
lastSequenceNum,
-   KinesisProxyInterface 
kinesis) {
+   KinesisProxyInterface 
kinesis,
+   MetricGroup 
kinesisMetricGroup) {
this.fetcherRef = checkNotNull(fetcherRef);
this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
this.subscribedShard = checkNotNull(subscribedShard);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
+
+   checkNotNull(kinesisMetricGroup)
+   .gauge("millisBehindLatest", () -> millisBehindLatest);
--- End diff --

Will this work? Am I allowed to use lambdas? What Java version has to be 
supported?


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2017-12-22 Thread casidiablo
Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158496021
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -70,35 +71,45 @@
 
private Date initTimestamp;
 
+   private long millisBehindLatest;
+
/**
 * Creates a shard consumer.
 *
 * @param fetcherRef reference to the owning fetcher
 * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
 * @param subscribedShard the shard this consumer is subscribed to
 * @param lastSequenceNum the sequence number in the shard to start 
consuming
+* @param kinesisMetricGroup the metric group to report to
 */
public ShardConsumer(KinesisDataFetcher fetcherRef,
Integer 
subscribedShardStateIndex,
StreamShardHandle 
subscribedShard,
-   SequenceNumber lastSequenceNum) 
{
+   SequenceNumber lastSequenceNum,
+   MetricGroup kinesisMetricGroup) 
{
this(fetcherRef,
subscribedShardStateIndex,
subscribedShard,
lastSequenceNum,
-   
KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
+   
KinesisProxy.create(fetcherRef.getConsumerConfiguration()),
+   kinesisMetricGroup);
}
 
/** This constructor is exposed for testing purposes. */
protected ShardConsumer(KinesisDataFetcher fetcherRef,
Integer 
subscribedShardStateIndex,
StreamShardHandle 
subscribedShard,
SequenceNumber 
lastSequenceNum,
-   KinesisProxyInterface 
kinesis) {
+   KinesisProxyInterface 
kinesis,
+   MetricGroup 
kinesisMetricGroup) {
this.fetcherRef = checkNotNull(fetcherRef);
this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
this.subscribedShard = checkNotNull(subscribedShard);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
+
+   checkNotNull(kinesisMetricGroup)
+   .gauge("millisBehindLatest", () -> millisBehindLatest);
--- End diff --

Will this work? Am I allowed to use lambdas? What Java version has to be 
supported?


---


[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301382#comment-16301382
 ] 

ASF GitHub Bot commented on FLINK-6094:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158489047
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
@@ -116,14 +135,100 @@ object UpdatingPlanChecker {
   val windowStartEnd = w.getWindowProperties.map(_.name)
--- End diff --

Window can only output start or end field, right ?


> Implement stream-stream proctime non-window  inner join
> ---
>
> Key: FLINK-6094
> URL: https://issues.apache.org/jira/browse/FLINK-6094
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301381#comment-16301381
 ] 

ASF GitHub Bot commented on FLINK-6094:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158489035
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
@@ -116,14 +135,100 @@ object UpdatingPlanChecker {
   val windowStartEnd = w.getWindowProperties.map(_.name)
   // we have only a unique key if at least one window property is 
selected
   if (windowStartEnd.nonEmpty) {
-keys = Some(groupKeys ++ windowStartEnd)
+val smallestAttribute = windowStartEnd.sorted.head
--- End diff --

Thanks, `windowStartEnd.min` is better, i want to get the lexicographic 
smallest attribute from `windowStartEnd` .  I have sorted it before get the 
first string.


> Implement stream-stream proctime non-window  inner join
> ---
>
> Key: FLINK-6094
> URL: https://issues.apache.org/jira/browse/FLINK-6094
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301383#comment-16301383
 ] 

ASF GitHub Bot commented on FLINK-6094:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158489057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.DataStreamInnerJoin
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * RelNode for a non-windowed stream join.
+  */
+class DataStreamJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinInfo: JoinInfo,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = schema.relDataType
+
+  override def needsUpdatesAsRetraction: Boolean = true
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinInfo,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  ruleDescription)
+  }
+
+  def getJoinInfo: JoinInfo = joinInfo
+
+  def getJoinType: JoinRelType = joinType
+
+  override def toString: String = {
+joinToString(
+  schema.relDataType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.relDataType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+val returnType = schema.typeInfo
+val keyPairs = joinInfo.pairs().toList
+
+// get the equality keys
+val leftKeys = ArrayBuffer.empty[Int]
+val rightKeys = ArrayBuffer.empty[Int]
+if (keyPairs.isEmpty) {
--- End diff --

Hi, I have removed this check, because equal join check has been done in 
`FlinkLogicalJoinConverter`. Also i have added an equal join test case in 
`org.apache.flink.table.api.stream.table.validation.JoinValidationTest`  in 
case of later changes in `FlinkLogicalJoinConverter`. What do you think?


> Implement stream-stream proctime non-window  inner join
> -

[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301379#comment-16301379
 ] 

ASF GitHub Bot commented on FLINK-6094:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158489017
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/DataStreamInnerJoin.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.CRowWrappingMultiOuputCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.slf4j.LoggerFactory
+import org.apache.flink.table.codegen.Compiler
+
+
+/**
+  * Connect data for left stream and right stream. Only use for innerJoin.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+class DataStreamInnerJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]] {
+
+  // state to hold left stream element
+  private var leftState: MapState[Row, JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  private var rightState: MapState[Row, JTuple2[Int, Long]] = _
+  private var cRowWrapper: CRowWrappingMultiOuputCollector = _
+
+  private val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  private var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  private var rightTimer: ValueState[Long] = _
+
+  // other condition function
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this row.
+val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
+val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
+  

[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301380#comment-16301380
 ] 

ASF GitHub Bot commented on FLINK-6094:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158489025
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/DataStreamInnerJoin.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.CRowWrappingMultiOuputCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.slf4j.LoggerFactory
+import org.apache.flink.table.codegen.Compiler
+
+
+/**
+  * Connect data for left stream and right stream. Only use for innerJoin.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+class DataStreamInnerJoin(
--- End diff --

Maybe `NonWindowInnerJoin` is better. It is consistent with `non-window 
aggregate`. What do you think ?


> Implement stream-stream proctime non-window  inner join
> ---
>
> Key: FLINK-6094
> URL: https://issues.apache.org/jira/browse/FLINK-6094
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...

2017-12-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158489057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.DataStreamInnerJoin
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * RelNode for a non-windowed stream join.
+  */
+class DataStreamJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinInfo: JoinInfo,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with CommonJoin
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = schema.relDataType
+
+  override def needsUpdatesAsRetraction: Boolean = true
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinInfo,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  ruleDescription)
+  }
+
+  def getJoinInfo: JoinInfo = joinInfo
+
+  def getJoinType: JoinRelType = joinType
+
+  override def toString: String = {
+joinToString(
+  schema.relDataType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.relDataType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+val returnType = schema.typeInfo
+val keyPairs = joinInfo.pairs().toList
+
+// get the equality keys
+val leftKeys = ArrayBuffer.empty[Int]
+val rightKeys = ArrayBuffer.empty[Int]
+if (keyPairs.isEmpty) {
--- End diff --

Hi, I have removed this check, because equal join check has been done in 
`FlinkLogicalJoinConverter`. Also i have added an equal join test case in 
`org.apache.flink.table.api.stream.table.validation.JoinValidationTest`  in 
case of later changes in `FlinkLogicalJoinConverter`. What do you think?


---


[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...

2017-12-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158489047
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
@@ -116,14 +135,100 @@ object UpdatingPlanChecker {
   val windowStartEnd = w.getWindowProperties.map(_.name)
--- End diff --

Window can only output start or end field, right ?


---


[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...

2017-12-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158489017
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/DataStreamInnerJoin.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.CRowWrappingMultiOuputCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.slf4j.LoggerFactory
+import org.apache.flink.table.codegen.Compiler
+
+
+/**
+  * Connect data for left stream and right stream. Only use for innerJoin.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+class DataStreamInnerJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]] {
+
+  // state to hold left stream element
+  private var leftState: MapState[Row, JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  private var rightState: MapState[Row, JTuple2[Int, Long]] = _
+  private var cRowWrapper: CRowWrappingMultiOuputCollector = _
+
+  private val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  private var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  private var rightTimer: ValueState[Long] = _
+
+  // other condition function
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this row.
+val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
+val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
+  "left", leftType, tupleTypeInfo)
+val rightStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
+  "right", rightType, tupleTypeInfo)
+leftState = getRuntimeContext.getMapState(leftStateDescriptor)
+rightS

[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...

2017-12-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158489035
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
@@ -116,14 +135,100 @@ object UpdatingPlanChecker {
   val windowStartEnd = w.getWindowProperties.map(_.name)
   // we have only a unique key if at least one window property is 
selected
   if (windowStartEnd.nonEmpty) {
-keys = Some(groupKeys ++ windowStartEnd)
+val smallestAttribute = windowStartEnd.sorted.head
--- End diff --

Thanks, `windowStartEnd.min` is better, i want to get the lexicographic 
smallest attribute from `windowStartEnd` .  I have sorted it before get the 
first string.


---


[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...

2017-12-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158488989
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import 
org.apache.flink.table.runtime.stream.table.GroupWindowITCase.TimestampAndWatermarkWithOffset
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
 WeightedAvg}
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  private val queryConfig = new StreamQueryConfig()
+  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
--- End diff --

Hi, maybe we can keep it. At least we can cover some logics in if (`if 
(stateCleaningEnabled && timerState.value() == 0)`).


---


[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301378#comment-16301378
 ] 

ASF GitHub Bot commented on FLINK-6094:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158488989
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import 
org.apache.flink.table.runtime.stream.table.GroupWindowITCase.TimestampAndWatermarkWithOffset
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
 WeightedAvg}
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  private val queryConfig = new StreamQueryConfig()
+  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
--- End diff --

Hi, maybe we can keep it. At least we can cover some logics in if (`if 
(stateCleaningEnabled && timerState.value() == 0)`).


> Implement stream-stream proctime non-window  inner join
> ---
>
> Key: FLINK-6094
> URL: https://issues.apache.org/jira/browse/FLINK-6094
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...

2017-12-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r158489025
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/DataStreamInnerJoin.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.CRowWrappingMultiOuputCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.slf4j.LoggerFactory
+import org.apache.flink.table.codegen.Compiler
+
+
+/**
+  * Connect data for left stream and right stream. Only use for innerJoin.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+class DataStreamInnerJoin(
--- End diff --

Maybe `NonWindowInnerJoin` is better. It is consistent with `non-window 
aggregate`. What do you think ?


---


[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301376#comment-16301376
 ] 

ASF GitHub Bot commented on FLINK-6094:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4471
  
Hi, @twalthr , thanks for your review.
The pr has been updated according to your comments. It mainly contains the 
following changes:

- Do some minor refactors in `UpdatingPlanChecker`, `NonWindowInnerJoin` 
and tests
- As for indicator attributes
  - Event-time attributes are considered to be not supported. When 
executing the join, the join operator needs to make sure that no late data is 
emitted. Window join makes it possible by holding back watermarks, but 
non-window join is unbounded, so we don't know how much to hold back. 
  - Proctime attributes are supported to be outputted from join but can not 
exist in join predicate. It seems there is no easy way to support proctime 
attributes in join predicate. If we evaluate proctime in code generator, left 
proctime will always equals right proctime, which makes `left.proctime > 
right.proctime` always return false. Currently, users can cast proctime 
attributes to long type if they want to do predicate. What do you think?


> Implement stream-stream proctime non-window  inner join
> ---
>
> Key: FLINK-6094
> URL: https://issues.apache.org/jira/browse/FLINK-6094
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4471: [FLINK-6094] [table] Implement stream-stream proctime non...

2017-12-22 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4471
  
Hi, @twalthr , thanks for your review.
The pr has been updated according to your comments. It mainly contains the 
following changes:

- Do some minor refactors in `UpdatingPlanChecker`, `NonWindowInnerJoin` 
and tests
- As for indicator attributes
  - Event-time attributes are considered to be not supported. When 
executing the join, the join operator needs to make sure that no late data is 
emitted. Window join makes it possible by holding back watermarks, but 
non-window join is unbounded, so we don't know how much to hold back. 
  - Proctime attributes are supported to be outputted from join but can not 
exist in join predicate. It seems there is no easy way to support proctime 
attributes in join predicate. If we evaluate proctime in code generator, left 
proctime will always equals right proctime, which makes `left.proctime > 
right.proctime` always return false. Currently, users can cast proctime 
attributes to long type if they want to do predicate. What do you think?


---


[jira] [Assigned] (FLINK-8277) Optimize code generation by using local references

2017-12-22 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-8277:
-

Assignee: Ruidong Li

> Optimize code generation by using local references
> --
>
> Key: FLINK-8277
> URL: https://issues.apache.org/jira/browse/FLINK-8277
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>
> By default, Flink uses {{org.apache.calcite.rex.RexProgram#expandLocalRef}} 
> to remove local references which reverses the effect of common subexpression 
> elimination. For a more performant execution and smaller generated code we 
> should leverage common subexpressions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher

2017-12-22 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-8308:


 Summary: Update yajl-ruby dependency to 1.3.1 or higher
 Key: FLINK-8308
 URL: https://issues.apache.org/jira/browse/FLINK-8308
 Project: Flink
  Issue Type: Task
  Components: Project Website
Reporter: Fabian Hueske
Priority: Critical
 Fix For: 1.5.0, 1.4.1


We got notified that yajl-ruby < 1.3.1, a dependency which is used to build the 
Flink website, has a  security vulnerability of high severity.

We should update yajl-ruby to 1.3.1 or higher.

Since the website is built offline and served as static HTML, I don't think 
this is a super critical issue (please correct me if I'm wrong), but we should 
resolve this soon.







--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2017-12-22 Thread Shashank Agarwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shashank Agarwal updated FLINK-7756:

Fix Version/s: 1.4.1
   1.5.0

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0, 1.5.0, 1.4.1
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.str